xref: /linux/drivers/md/dm-vdo/slab-depot.c (revision d358e5254674b70f34c847715ca509e46eb81e6f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "slab-depot.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/err.h>
11 #include <linux/log2.h>
12 #include <linux/min_heap.h>
13 #include <linux/minmax.h>
14 
15 #include "logger.h"
16 #include "memory-alloc.h"
17 #include "numeric.h"
18 #include "permassert.h"
19 #include "string-utils.h"
20 
21 #include "action-manager.h"
22 #include "admin-state.h"
23 #include "completion.h"
24 #include "constants.h"
25 #include "data-vio.h"
26 #include "encodings.h"
27 #include "io-submitter.h"
28 #include "physical-zone.h"
29 #include "priority-table.h"
30 #include "recovery-journal.h"
31 #include "repair.h"
32 #include "status-codes.h"
33 #include "types.h"
34 #include "vdo.h"
35 #include "vio.h"
36 #include "wait-queue.h"
37 
38 static const u64 BYTES_PER_WORD = sizeof(u64);
39 static const bool NORMAL_OPERATION = true;
40 
41 /**
42  * get_lock() - Get the lock object for a slab journal block by sequence number.
43  * @journal: The vdo_slab journal to retrieve from.
44  * @sequence_number: Sequence number of the block.
45  *
46  * Return: The lock object for the given sequence number.
47  */
get_lock(struct slab_journal * journal,sequence_number_t sequence_number)48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal,
49 							  sequence_number_t sequence_number)
50 {
51 	return &journal->locks[sequence_number % journal->size];
52 }
53 
is_slab_open(struct vdo_slab * slab)54 static bool is_slab_open(struct vdo_slab *slab)
55 {
56 	return (!vdo_is_state_quiescing(&slab->state) &&
57 		!vdo_is_state_quiescent(&slab->state));
58 }
59 
60 /**
61  * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush.
62  * @journal: The journal to check.
63  *
64  * Return: true if there are no entry waiters, or if the slab is unrecovered.
65  */
must_make_entries_to_flush(struct slab_journal * journal)66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal)
67 {
68 	return ((journal->slab->status != VDO_SLAB_REBUILDING) &&
69 		vdo_waitq_has_waiters(&journal->entry_waiters));
70 }
71 
72 /**
73  * is_reaping() - Check whether a reap is currently in progress.
74  * @journal: The journal which may be reaping.
75  *
76  * Return: true if the journal is reaping.
77  */
is_reaping(struct slab_journal * journal)78 static inline bool __must_check is_reaping(struct slab_journal *journal)
79 {
80 	return (journal->head != journal->unreapable);
81 }
82 
83 /**
84  * initialize_tail_block() - Initialize tail block as a new block.
85  * @journal: The journal whose tail block is being initialized.
86  */
initialize_tail_block(struct slab_journal * journal)87 static void initialize_tail_block(struct slab_journal *journal)
88 {
89 	struct slab_journal_block_header *header = &journal->tail_header;
90 
91 	header->sequence_number = journal->tail;
92 	header->entry_count = 0;
93 	header->has_block_map_increments = false;
94 }
95 
96 /**
97  * initialize_journal_state() - Set all journal fields appropriately to start journaling.
98  * @journal: The journal to be reset, based on its tail sequence number.
99  */
initialize_journal_state(struct slab_journal * journal)100 static void initialize_journal_state(struct slab_journal *journal)
101 {
102 	journal->unreapable = journal->head;
103 	journal->reap_lock = get_lock(journal, journal->unreapable);
104 	journal->next_commit = journal->tail;
105 	journal->summarized = journal->last_summarized = journal->tail;
106 	initialize_tail_block(journal);
107 }
108 
109 /**
110  * block_is_full() - Check whether a journal block is full.
111  * @journal: The slab journal for the block.
112  *
113  * Return: True if the tail block is full.
114  */
block_is_full(struct slab_journal * journal)115 static bool __must_check block_is_full(struct slab_journal *journal)
116 {
117 	journal_entry_count_t count = journal->tail_header.entry_count;
118 
119 	return (journal->tail_header.has_block_map_increments ?
120 		(journal->full_entries_per_block == count) :
121 		(journal->entries_per_block == count));
122 }
123 
124 static void add_entries(struct slab_journal *journal);
125 static void update_tail_block_location(struct slab_journal *journal);
126 static void release_journal_locks(struct vdo_waiter *waiter, void *context);
127 
128 /**
129  * is_slab_journal_blank() - Check whether a slab's journal is blank.
130  * @slab: The slab to check.
131  *
132  * A slab journal is blank if it has never had any entries recorded in it.
133  *
134  * Return: True if the slab's journal has never been modified.
135  */
is_slab_journal_blank(const struct vdo_slab * slab)136 static bool is_slab_journal_blank(const struct vdo_slab *slab)
137 {
138 	return ((slab->journal.tail == 1) &&
139 		(slab->journal.tail_header.entry_count == 0));
140 }
141 
142 /**
143  * mark_slab_journal_dirty() - Put a slab journal on the dirty list of its allocator in the correct
144  *                             order.
145  * @journal: The journal to be marked dirty.
146  * @lock: The recovery journal lock held by the slab journal.
147  */
mark_slab_journal_dirty(struct slab_journal * journal,sequence_number_t lock)148 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock)
149 {
150 	struct slab_journal *dirty_journal;
151 	struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals;
152 
153 	VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean");
154 
155 	journal->recovery_lock = lock;
156 	list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) {
157 		if (dirty_journal->recovery_lock <= journal->recovery_lock)
158 			break;
159 	}
160 
161 	list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next);
162 }
163 
mark_slab_journal_clean(struct slab_journal * journal)164 static void mark_slab_journal_clean(struct slab_journal *journal)
165 {
166 	journal->recovery_lock = 0;
167 	list_del_init(&journal->dirty_entry);
168 }
169 
check_if_slab_drained(struct vdo_slab * slab)170 static void check_if_slab_drained(struct vdo_slab *slab)
171 {
172 	bool read_only;
173 	struct slab_journal *journal = &slab->journal;
174 	const struct admin_state_code *code;
175 
176 	if (!vdo_is_state_draining(&slab->state) ||
177 	    must_make_entries_to_flush(journal) ||
178 	    is_reaping(journal) ||
179 	    journal->waiting_to_commit ||
180 	    !list_empty(&journal->uncommitted_blocks) ||
181 	    journal->updating_slab_summary ||
182 	    (slab->active_count > 0))
183 		return;
184 
185 	/* When not suspending or recovering, the slab must be clean. */
186 	code = vdo_get_admin_state_code(&slab->state);
187 	read_only = vdo_is_read_only(slab->allocator->depot->vdo);
188 	if (!read_only &&
189 	    vdo_waitq_has_waiters(&slab->dirty_blocks) &&
190 	    (code != VDO_ADMIN_STATE_SUSPENDING) &&
191 	    (code != VDO_ADMIN_STATE_RECOVERING))
192 		return;
193 
194 	vdo_finish_draining_with_result(&slab->state,
195 					(read_only ? VDO_READ_ONLY : VDO_SUCCESS));
196 }
197 
198 /* FULLNESS HINT COMPUTATION */
199 
200 /**
201  * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be
202  *                           stored in a slab_summary_entry's 7 bits that are dedicated to its free
203  *                           count.
204  * @depot: The depot whose summary being updated.
205  * @free_blocks: The number of free blocks.
206  *
207  * Note: the number of free blocks must be strictly less than 2^23 blocks, even though
208  * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least
209  * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might
210  * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f
211  * is 0, which would make it impossible to distinguish completely full from completely empty.
212  *
213  * Return: A fullness hint, which can be stored in 7 bits.
214  */
compute_fullness_hint(struct slab_depot * depot,block_count_t free_blocks)215 static u8 __must_check compute_fullness_hint(struct slab_depot *depot,
216 					     block_count_t free_blocks)
217 {
218 	block_count_t hint;
219 
220 	VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23");
221 
222 	if (free_blocks == 0)
223 		return 0;
224 
225 	hint = free_blocks >> depot->hint_shift;
226 	return ((hint == 0) ? 1 : hint);
227 }
228 
229 /**
230  * check_summary_drain_complete() - Check whether an allocators summary has finished draining.
231  * @allocator: The allocator to check.
232  */
check_summary_drain_complete(struct block_allocator * allocator)233 static void check_summary_drain_complete(struct block_allocator *allocator)
234 {
235 	if (!vdo_is_state_draining(&allocator->summary_state) ||
236 	    (allocator->summary_write_count > 0))
237 		return;
238 
239 	vdo_finish_operation(&allocator->summary_state,
240 			     (vdo_is_read_only(allocator->depot->vdo) ?
241 			      VDO_READ_ONLY : VDO_SUCCESS));
242 }
243 
244 /**
245  * notify_summary_waiters() - Wake all the waiters in a given queue.
246  * @allocator: The block allocator summary which owns the queue.
247  * @queue: The queue to notify.
248  */
notify_summary_waiters(struct block_allocator * allocator,struct vdo_wait_queue * queue)249 static void notify_summary_waiters(struct block_allocator *allocator,
250 				   struct vdo_wait_queue *queue)
251 {
252 	int result = (vdo_is_read_only(allocator->depot->vdo) ?
253 		      VDO_READ_ONLY : VDO_SUCCESS);
254 
255 	vdo_waitq_notify_all_waiters(queue, NULL, &result);
256 }
257 
258 static void launch_write(struct slab_summary_block *summary_block);
259 
260 /**
261  * finish_updating_slab_summary_block() - Finish processing a block which attempted to write,
262  *                                        whether or not the attempt succeeded.
263  * @block: The block.
264  */
finish_updating_slab_summary_block(struct slab_summary_block * block)265 static void finish_updating_slab_summary_block(struct slab_summary_block *block)
266 {
267 	notify_summary_waiters(block->allocator, &block->current_update_waiters);
268 	block->writing = false;
269 	block->allocator->summary_write_count--;
270 	if (vdo_waitq_has_waiters(&block->next_update_waiters))
271 		launch_write(block);
272 	else
273 		check_summary_drain_complete(block->allocator);
274 }
275 
276 /**
277  * finish_update() - This is the callback for a successful summary block write.
278  * @completion: The write vio.
279  */
finish_update(struct vdo_completion * completion)280 static void finish_update(struct vdo_completion *completion)
281 {
282 	struct slab_summary_block *block =
283 		container_of(as_vio(completion), struct slab_summary_block, vio);
284 
285 	atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written);
286 	finish_updating_slab_summary_block(block);
287 }
288 
289 /**
290  * handle_write_error() - Handle an error writing a slab summary block.
291  * @completion: The write VIO.
292  */
handle_write_error(struct vdo_completion * completion)293 static void handle_write_error(struct vdo_completion *completion)
294 {
295 	struct slab_summary_block *block =
296 		container_of(as_vio(completion), struct slab_summary_block, vio);
297 
298 	vio_record_metadata_io_error(as_vio(completion));
299 	vdo_enter_read_only_mode(completion->vdo, completion->result);
300 	finish_updating_slab_summary_block(block);
301 }
302 
write_slab_summary_endio(struct bio * bio)303 static void write_slab_summary_endio(struct bio *bio)
304 {
305 	struct vio *vio = bio->bi_private;
306 	struct slab_summary_block *block =
307 		container_of(vio, struct slab_summary_block, vio);
308 
309 	continue_vio_after_io(vio, finish_update, block->allocator->thread_id);
310 }
311 
312 /**
313  * launch_write() - Write a slab summary block unless it is currently out for writing.
314  * @block: The block that needs to be committed.
315  */
launch_write(struct slab_summary_block * block)316 static void launch_write(struct slab_summary_block *block)
317 {
318 	struct block_allocator *allocator = block->allocator;
319 	struct slab_depot *depot = allocator->depot;
320 	physical_block_number_t pbn;
321 
322 	if (block->writing)
323 		return;
324 
325 	allocator->summary_write_count++;
326 	vdo_waitq_transfer_all_waiters(&block->next_update_waiters,
327 				       &block->current_update_waiters);
328 	block->writing = true;
329 
330 	if (vdo_is_read_only(depot->vdo)) {
331 		finish_updating_slab_summary_block(block);
332 		return;
333 	}
334 
335 	memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE);
336 
337 	/*
338 	 * Flush before writing to ensure that the slab journal tail blocks and reference updates
339 	 * covered by this summary update are stable. Otherwise, a subsequent recovery could
340 	 * encounter a slab summary update that refers to a slab journal tail block that has not
341 	 * actually been written. In such cases, the slab journal referenced will be treated as
342 	 * empty, causing any data within the slab which predates the existing recovery journal
343 	 * entries to be lost.
344 	 */
345 	pbn = (depot->summary_origin +
346 	       (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) +
347 	       block->index);
348 	vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio,
349 				handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH);
350 }
351 
352 /**
353  * update_slab_summary_entry() - Update the entry for a slab.
354  * @slab: The slab whose entry is to be updated.
355  * @waiter: The waiter that is updating the summary.
356  * @tail_block_offset: The offset of the slab journal's tail block.
357  * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load.
358  * @is_clean: Whether the slab is clean.
359  * @free_blocks: The number of free blocks.
360  */
update_slab_summary_entry(struct vdo_slab * slab,struct vdo_waiter * waiter,tail_block_offset_t tail_block_offset,bool load_ref_counts,bool is_clean,block_count_t free_blocks)361 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter,
362 				      tail_block_offset_t tail_block_offset,
363 				      bool load_ref_counts, bool is_clean,
364 				      block_count_t free_blocks)
365 {
366 	u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK;
367 	struct block_allocator *allocator = slab->allocator;
368 	struct slab_summary_block *block = &allocator->summary_blocks[index];
369 	int result;
370 	struct slab_summary_entry *entry;
371 
372 	if (vdo_is_read_only(block->vio.completion.vdo)) {
373 		result = VDO_READ_ONLY;
374 		waiter->callback(waiter, &result);
375 		return;
376 	}
377 
378 	if (vdo_is_state_draining(&allocator->summary_state) ||
379 	    vdo_is_state_quiescent(&allocator->summary_state)) {
380 		result = VDO_INVALID_ADMIN_STATE;
381 		waiter->callback(waiter, &result);
382 		return;
383 	}
384 
385 	entry = &allocator->summary_entries[slab->slab_number];
386 	*entry = (struct slab_summary_entry) {
387 		.tail_block_offset = tail_block_offset,
388 		.load_ref_counts = (entry->load_ref_counts || load_ref_counts),
389 		.is_dirty = !is_clean,
390 		.fullness_hint = compute_fullness_hint(allocator->depot, free_blocks),
391 	};
392 	vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter);
393 	launch_write(block);
394 }
395 
396 /**
397  * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are
398  *                    complete.
399  * @journal: The journal to be reaped.
400  */
finish_reaping(struct slab_journal * journal)401 static void finish_reaping(struct slab_journal *journal)
402 {
403 	journal->head = journal->unreapable;
404 	add_entries(journal);
405 	check_if_slab_drained(journal->slab);
406 }
407 
408 static void reap_slab_journal(struct slab_journal *journal);
409 
410 /**
411  * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try
412  *                      reaping again in case we deferred reaping due to an outstanding vio.
413  * @completion: The flush vio.
414  */
complete_reaping(struct vdo_completion * completion)415 static void complete_reaping(struct vdo_completion *completion)
416 {
417 	struct slab_journal *journal = completion->parent;
418 
419 	return_vio_to_pool(vio_as_pooled_vio(as_vio(completion)));
420 	finish_reaping(journal);
421 	reap_slab_journal(journal);
422 }
423 
424 /**
425  * handle_flush_error() - Handle an error flushing the lower layer.
426  * @completion: The flush vio.
427  */
handle_flush_error(struct vdo_completion * completion)428 static void handle_flush_error(struct vdo_completion *completion)
429 {
430 	vio_record_metadata_io_error(as_vio(completion));
431 	vdo_enter_read_only_mode(completion->vdo, completion->result);
432 	complete_reaping(completion);
433 }
434 
flush_endio(struct bio * bio)435 static void flush_endio(struct bio *bio)
436 {
437 	struct vio *vio = bio->bi_private;
438 	struct slab_journal *journal = vio->completion.parent;
439 
440 	continue_vio_after_io(vio, complete_reaping,
441 			      journal->slab->allocator->thread_id);
442 }
443 
444 /**
445  * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer
446  *                       prior to reaping.
447  * @waiter: The journal as a flush waiter.
448  * @context: The newly acquired flush vio.
449  */
flush_for_reaping(struct vdo_waiter * waiter,void * context)450 static void flush_for_reaping(struct vdo_waiter *waiter, void *context)
451 {
452 	struct slab_journal *journal =
453 		container_of(waiter, struct slab_journal, flush_waiter);
454 	struct pooled_vio *pooled = context;
455 	struct vio *vio = &pooled->vio;
456 
457 	vio->completion.parent = journal;
458 	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
459 }
460 
461 /**
462  * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks.
463  * @journal: The slab journal.
464  */
reap_slab_journal(struct slab_journal * journal)465 static void reap_slab_journal(struct slab_journal *journal)
466 {
467 	bool reaped = false;
468 
469 	if (is_reaping(journal)) {
470 		/* We already have a reap in progress so wait for it to finish. */
471 		return;
472 	}
473 
474 	if ((journal->slab->status != VDO_SLAB_REBUILT) ||
475 	    !vdo_is_state_normal(&journal->slab->state) ||
476 	    vdo_is_read_only(journal->slab->allocator->depot->vdo)) {
477 		/*
478 		 * We must not reap in the first two cases, and there's no point in read-only mode.
479 		 */
480 		return;
481 	}
482 
483 	/*
484 	 * Start reclaiming blocks only when the journal head has no references. Then stop when a
485 	 * block is referenced or reap reaches the most recently written block, referenced by the
486 	 * slab summary, which has the sequence number just before the tail.
487 	 */
488 	while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) {
489 		reaped = true;
490 		journal->unreapable++;
491 		journal->reap_lock++;
492 		if (journal->reap_lock == &journal->locks[journal->size])
493 			journal->reap_lock = &journal->locks[0];
494 	}
495 
496 	if (!reaped)
497 		return;
498 
499 	/*
500 	 * It is never safe to reap a slab journal block without first issuing a flush, regardless
501 	 * of whether a user flush has been received or not. In the absence of the flush, the
502 	 * reference block write which released the locks allowing the slab journal to reap may not
503 	 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab
504 	 * journal block writes can be issued while previous slab summary updates have not yet been
505 	 * made. Even though those slab journal block writes will be ignored if the slab summary
506 	 * update is not persisted, they may still overwrite the to-be-reaped slab journal block
507 	 * resulting in a loss of reference count updates.
508 	 */
509 	journal->flush_waiter.callback = flush_for_reaping;
510 	acquire_vio_from_pool(journal->slab->allocator->vio_pool,
511 			      &journal->flush_waiter);
512 }
513 
514 /**
515  * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block.
516  * @journal: The slab journal.
517  * @sequence_number: The journal sequence number of the referenced block.
518  * @adjustment: Amount to adjust the reference counter.
519  *
520  * Note that when the adjustment is negative, the slab journal will be reaped.
521  */
adjust_slab_journal_block_reference(struct slab_journal * journal,sequence_number_t sequence_number,int adjustment)522 static void adjust_slab_journal_block_reference(struct slab_journal *journal,
523 						sequence_number_t sequence_number,
524 						int adjustment)
525 {
526 	struct journal_lock *lock;
527 
528 	if (sequence_number == 0)
529 		return;
530 
531 	if (journal->slab->status == VDO_SLAB_REPLAYING) {
532 		/* Locks should not be used during offline replay. */
533 		return;
534 	}
535 
536 	VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
537 	lock = get_lock(journal, sequence_number);
538 	if (adjustment < 0) {
539 		VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count),
540 				    "adjustment %d of lock count %u for slab journal block %llu must not underflow",
541 				    adjustment, lock->count,
542 				    (unsigned long long) sequence_number);
543 	}
544 
545 	lock->count += adjustment;
546 	if (lock->count == 0)
547 		reap_slab_journal(journal);
548 }
549 
550 /**
551  * release_journal_locks() - Callback invoked after a slab summary update completes.
552  * @waiter: The slab summary waiter that has just been notified.
553  * @context: The result code of the update.
554  *
555  * Registered in the constructor on behalf of update_tail_block_location().
556  *
557  * Implements waiter_callback_fn.
558  */
release_journal_locks(struct vdo_waiter * waiter,void * context)559 static void release_journal_locks(struct vdo_waiter *waiter, void *context)
560 {
561 	sequence_number_t first, i;
562 	struct slab_journal *journal =
563 		container_of(waiter, struct slab_journal, slab_summary_waiter);
564 	int result = *((int *) context);
565 
566 	if (result != VDO_SUCCESS) {
567 		if (result != VDO_READ_ONLY) {
568 			/*
569 			 * Don't bother logging what might be lots of errors if we are already in
570 			 * read-only mode.
571 			 */
572 			vdo_log_error_strerror(result, "failed slab summary update %llu",
573 					       (unsigned long long) journal->summarized);
574 		}
575 
576 		journal->updating_slab_summary = false;
577 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
578 		check_if_slab_drained(journal->slab);
579 		return;
580 	}
581 
582 	if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) {
583 		journal->partial_write_in_progress = false;
584 		add_entries(journal);
585 	}
586 
587 	first = journal->last_summarized;
588 	journal->last_summarized = journal->summarized;
589 	for (i = journal->summarized - 1; i >= first; i--) {
590 		/*
591 		 * Release the lock the summarized block held on the recovery journal. (During
592 		 * replay, recovery_start will always be 0.)
593 		 */
594 		if (journal->recovery_journal != NULL) {
595 			zone_count_t zone_number = journal->slab->allocator->zone_number;
596 			struct journal_lock *lock = get_lock(journal, i);
597 
598 			vdo_release_recovery_journal_block_reference(journal->recovery_journal,
599 								     lock->recovery_start,
600 								     VDO_ZONE_TYPE_PHYSICAL,
601 								     zone_number);
602 		}
603 
604 		/*
605 		 * Release our own lock against reaping for blocks that are committed. (This
606 		 * function will not change locks during replay.)
607 		 */
608 		adjust_slab_journal_block_reference(journal, i, -1);
609 	}
610 
611 	journal->updating_slab_summary = false;
612 
613 	reap_slab_journal(journal);
614 
615 	/* Check if the slab summary needs to be updated again. */
616 	update_tail_block_location(journal);
617 }
618 
619 /**
620  * update_tail_block_location() - Update the tail block location in the slab summary, if necessary.
621  * @journal: The slab journal that is updating its tail block location.
622  */
update_tail_block_location(struct slab_journal * journal)623 static void update_tail_block_location(struct slab_journal *journal)
624 {
625 	block_count_t free_block_count;
626 	struct vdo_slab *slab = journal->slab;
627 
628 	if (journal->updating_slab_summary ||
629 	    vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
630 	    (journal->last_summarized >= journal->next_commit)) {
631 		check_if_slab_drained(slab);
632 		return;
633 	}
634 
635 	if (slab->status != VDO_SLAB_REBUILT) {
636 		u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint;
637 
638 		free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift;
639 	} else {
640 		free_block_count = slab->free_blocks;
641 	}
642 
643 	journal->summarized = journal->next_commit;
644 	journal->updating_slab_summary = true;
645 
646 	/*
647 	 * Update slab summary as dirty.
648 	 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this
649 	 * slab have been written to the layer. Therefore, indicate that the ref counts must be
650 	 * loaded when the journal head has reaped past sequence number 1.
651 	 */
652 	update_slab_summary_entry(slab, &journal->slab_summary_waiter,
653 				  journal->summarized % journal->size,
654 				  (journal->head > 1), false, free_block_count);
655 }
656 
657 /**
658  * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries.
659  * @slab: The slab to reopen.
660  */
reopen_slab_journal(struct vdo_slab * slab)661 static void reopen_slab_journal(struct vdo_slab *slab)
662 {
663 	struct slab_journal *journal = &slab->journal;
664 	sequence_number_t block;
665 
666 	VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0,
667 			    "vdo_slab journal's active block empty before reopening");
668 	journal->head = journal->tail;
669 	initialize_journal_state(journal);
670 
671 	/* Ensure no locks are spuriously held on an empty journal. */
672 	for (block = 1; block <= journal->size; block++) {
673 		VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0),
674 				    "Scrubbed journal's block %llu is not locked",
675 				    (unsigned long long) block);
676 	}
677 
678 	add_entries(journal);
679 }
680 
get_committing_sequence_number(const struct pooled_vio * vio)681 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio)
682 {
683 	const struct packed_slab_journal_block *block =
684 		(const struct packed_slab_journal_block *) vio->vio.data;
685 
686 	return __le64_to_cpu(block->header.sequence_number);
687 }
688 
689 /**
690  * complete_write() - Handle post-commit processing.
691  * @completion: The write vio as a completion.
692  *
693  * This is the callback registered by write_slab_journal_block().
694  */
complete_write(struct vdo_completion * completion)695 static void complete_write(struct vdo_completion *completion)
696 {
697 	int result = completion->result;
698 	struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion));
699 	struct slab_journal *journal = completion->parent;
700 	sequence_number_t committed = get_committing_sequence_number(pooled);
701 
702 	list_del_init(&pooled->list_entry);
703 	return_vio_to_pool(pooled);
704 
705 	if (result != VDO_SUCCESS) {
706 		vio_record_metadata_io_error(as_vio(completion));
707 		vdo_log_error_strerror(result, "cannot write slab journal block %llu",
708 				       (unsigned long long) committed);
709 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
710 		check_if_slab_drained(journal->slab);
711 		return;
712 	}
713 
714 	WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1);
715 
716 	if (list_empty(&journal->uncommitted_blocks)) {
717 		/* If no blocks are outstanding, then the commit point is at the tail. */
718 		journal->next_commit = journal->tail;
719 	} else {
720 		/* The commit point is always the beginning of the oldest incomplete block. */
721 		pooled = container_of(journal->uncommitted_blocks.next,
722 				      struct pooled_vio, list_entry);
723 		journal->next_commit = get_committing_sequence_number(pooled);
724 	}
725 
726 	update_tail_block_location(journal);
727 }
728 
write_slab_journal_endio(struct bio * bio)729 static void write_slab_journal_endio(struct bio *bio)
730 {
731 	struct vio *vio = bio->bi_private;
732 	struct slab_journal *journal = vio->completion.parent;
733 
734 	continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id);
735 }
736 
737 /**
738  * write_slab_journal_block() - Write a slab journal block.
739  * @waiter: The vio pool waiter which was just notified.
740  * @context: The vio pool entry for the write.
741  *
742  * Callback from acquire_vio_from_pool() registered in commit_tail().
743  */
write_slab_journal_block(struct vdo_waiter * waiter,void * context)744 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context)
745 {
746 	struct pooled_vio *pooled = context;
747 	struct vio *vio = &pooled->vio;
748 	struct slab_journal *journal =
749 		container_of(waiter, struct slab_journal, resource_waiter);
750 	struct slab_journal_block_header *header = &journal->tail_header;
751 	int unused_entries = journal->entries_per_block - header->entry_count;
752 	physical_block_number_t block_number;
753 	const struct admin_state_code *operation;
754 
755 	header->head = journal->head;
756 	list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks);
757 	vdo_pack_slab_journal_block_header(header, &journal->block->header);
758 
759 	/* Copy the tail block into the vio. */
760 	memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE);
761 
762 	VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull");
763 	if (unused_entries > 0) {
764 		/*
765 		 * Release the per-entry locks for any unused entries in the block we are about to
766 		 * write.
767 		 */
768 		adjust_slab_journal_block_reference(journal, header->sequence_number,
769 						    -unused_entries);
770 		journal->partial_write_in_progress = !block_is_full(journal);
771 	}
772 
773 	block_number = journal->slab->journal_origin +
774 		(header->sequence_number % journal->size);
775 	vio->completion.parent = journal;
776 
777 	/*
778 	 * This block won't be read in recovery until the slab summary is updated to refer to it.
779 	 * The slab summary update does a flush which is sufficient to protect us from corruption
780 	 * due to out of order slab journal, reference block, or block map writes.
781 	 */
782 	vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio,
783 				complete_write, REQ_OP_WRITE);
784 
785 	/* Since the write is submitted, the tail block structure can be reused. */
786 	journal->tail++;
787 	initialize_tail_block(journal);
788 	journal->waiting_to_commit = false;
789 
790 	operation = vdo_get_admin_state_code(&journal->slab->state);
791 	if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) {
792 		vdo_finish_operation(&journal->slab->state,
793 				     (vdo_is_read_only(journal->slab->allocator->depot->vdo) ?
794 				      VDO_READ_ONLY : VDO_SUCCESS));
795 		return;
796 	}
797 
798 	add_entries(journal);
799 }
800 
801 /**
802  * commit_tail() - Commit the tail block of the slab journal.
803  * @journal: The journal whose tail block should be committed.
804  */
commit_tail(struct slab_journal * journal)805 static void commit_tail(struct slab_journal *journal)
806 {
807 	if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) {
808 		/*
809 		 * There are no entries at the moment, but there are some waiters, so defer
810 		 * initiating the flush until those entries are ready to write.
811 		 */
812 		return;
813 	}
814 
815 	if (vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
816 	    journal->waiting_to_commit ||
817 	    (journal->tail_header.entry_count == 0)) {
818 		/*
819 		 * There is nothing to do since the tail block is empty, or writing, or the journal
820 		 * is in read-only mode.
821 		 */
822 		return;
823 	}
824 
825 	/*
826 	 * Since we are about to commit the tail block, this journal no longer needs to be on the
827 	 * list of journals which the recovery journal might ask to commit.
828 	 */
829 	mark_slab_journal_clean(journal);
830 
831 	journal->waiting_to_commit = true;
832 
833 	journal->resource_waiter.callback = write_slab_journal_block;
834 	acquire_vio_from_pool(journal->slab->allocator->vio_pool,
835 			      &journal->resource_waiter);
836 }
837 
838 /**
839  * encode_slab_journal_entry() - Encode a slab journal entry.
840  * @tail_header: The unpacked header for the block.
841  * @payload: The journal block payload to hold the entry.
842  * @sbn: The slab block number of the entry to encode.
843  * @operation: The type of the entry.
844  * @increment: True if this is an increment.
845  */
encode_slab_journal_entry(struct slab_journal_block_header * tail_header,slab_journal_payload * payload,slab_block_number sbn,enum journal_operation operation,bool increment)846 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header,
847 				      slab_journal_payload *payload,
848 				      slab_block_number sbn,
849 				      enum journal_operation operation,
850 				      bool increment)
851 {
852 	journal_entry_count_t entry_number = tail_header->entry_count++;
853 
854 	if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
855 		if (!tail_header->has_block_map_increments) {
856 			memset(payload->full_entries.entry_types, 0,
857 			       VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE);
858 			tail_header->has_block_map_increments = true;
859 		}
860 
861 		payload->full_entries.entry_types[entry_number / 8] |=
862 			((u8)1 << (entry_number % 8));
863 	}
864 
865 	vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment);
866 }
867 
868 /**
869  * expand_journal_point() - Convert a recovery journal journal_point which refers to both an
870  *                          increment and a decrement to a single point which refers to one or the
871  *                          other.
872  * @recovery_point: The journal point to convert.
873  * @increment: Whether the current entry is an increment.
874  *
875  * Return: The expanded journal point
876  *
877  * Because each data_vio has but a single recovery journal point, but may need to make both
878  * increment and decrement entries in the same slab journal. In order to distinguish the two
879  * entries, the entry count of the expanded journal point is twice the actual recovery journal
880  * entry count for increments, and one more than that for decrements.
881  */
expand_journal_point(struct journal_point recovery_point,bool increment)882 static struct journal_point expand_journal_point(struct journal_point recovery_point,
883 						 bool increment)
884 {
885 	recovery_point.entry_count *= 2;
886 	if (!increment)
887 		recovery_point.entry_count++;
888 
889 	return recovery_point;
890 }
891 
892 /**
893  * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a
894  *               block becomes full.
895  * @journal: The slab journal to append to.
896  * @pbn: The pbn being adjusted.
897  * @operation: The type of entry to make.
898  * @increment: True if this is an increment.
899  * @recovery_point: The expanded recovery point.
900  *
901  * This function is synchronous.
902  */
add_entry(struct slab_journal * journal,physical_block_number_t pbn,enum journal_operation operation,bool increment,struct journal_point recovery_point)903 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn,
904 		      enum journal_operation operation, bool increment,
905 		      struct journal_point recovery_point)
906 {
907 	struct packed_slab_journal_block *block = journal->block;
908 	int result;
909 
910 	result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point,
911 						     &recovery_point),
912 			    "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u",
913 			    (unsigned long long) recovery_point.sequence_number,
914 			    recovery_point.entry_count,
915 			    (unsigned long long) journal->tail_header.recovery_point.sequence_number,
916 			    journal->tail_header.recovery_point.entry_count);
917 	if (result != VDO_SUCCESS) {
918 		vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
919 		return;
920 	}
921 
922 	if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
923 		result = VDO_ASSERT((journal->tail_header.entry_count <
924 				     journal->full_entries_per_block),
925 				    "block has room for full entries");
926 		if (result != VDO_SUCCESS) {
927 			vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo,
928 						 result);
929 			return;
930 		}
931 	}
932 
933 	encode_slab_journal_entry(&journal->tail_header, &block->payload,
934 				  pbn - journal->slab->start, operation, increment);
935 	journal->tail_header.recovery_point = recovery_point;
936 	if (block_is_full(journal))
937 		commit_tail(journal);
938 }
939 
journal_length(const struct slab_journal * journal)940 static inline block_count_t journal_length(const struct slab_journal *journal)
941 {
942 	return journal->tail - journal->head;
943 }
944 
945 /**
946  * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal.
947  * @slab: The slab to play into.
948  * @pbn: The PBN for the entry.
949  * @operation: The type of entry to add.
950  * @increment: True if this entry is an increment.
951  * @recovery_point: The recovery journal point corresponding to this entry.
952  * @parent: The completion to notify when there is space to add the entry if the entry could not be
953  *          added immediately.
954  *
955  * Return: True if the entry was added immediately.
956  */
vdo_attempt_replay_into_slab(struct vdo_slab * slab,physical_block_number_t pbn,enum journal_operation operation,bool increment,struct journal_point * recovery_point,struct vdo_completion * parent)957 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn,
958 				  enum journal_operation operation, bool increment,
959 				  struct journal_point *recovery_point,
960 				  struct vdo_completion *parent)
961 {
962 	struct slab_journal *journal = &slab->journal;
963 	struct slab_journal_block_header *header = &journal->tail_header;
964 	struct journal_point expanded = expand_journal_point(*recovery_point, increment);
965 
966 	/* Only accept entries after the current recovery point. */
967 	if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded))
968 		return true;
969 
970 	if ((header->entry_count >= journal->full_entries_per_block) &&
971 	    (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) {
972 		/*
973 		 * The tail block does not have room for the entry we are attempting to add so
974 		 * commit the tail block now.
975 		 */
976 		commit_tail(journal);
977 	}
978 
979 	if (journal->waiting_to_commit) {
980 		vdo_start_operation_with_waiter(&journal->slab->state,
981 						VDO_ADMIN_STATE_WAITING_FOR_RECOVERY,
982 						parent, NULL);
983 		return false;
984 	}
985 
986 	if (journal_length(journal) >= journal->size) {
987 		/*
988 		 * We must have reaped the current head before the crash, since the blocked
989 		 * threshold keeps us from having more entries than fit in a slab journal; hence we
990 		 * can just advance the head (and unreapable block), as needed.
991 		 */
992 		journal->head++;
993 		journal->unreapable++;
994 	}
995 
996 	if (journal->slab->status == VDO_SLAB_REBUILT)
997 		journal->slab->status = VDO_SLAB_REPLAYING;
998 
999 	add_entry(journal, pbn, operation, increment, expanded);
1000 	return true;
1001 }
1002 
1003 /**
1004  * requires_reaping() - Check whether the journal must be reaped before adding new entries.
1005  * @journal: The journal to check.
1006  *
1007  * Return: True if the journal must be reaped.
1008  */
requires_reaping(const struct slab_journal * journal)1009 static bool requires_reaping(const struct slab_journal *journal)
1010 {
1011 	return (journal_length(journal) >= journal->blocking_threshold);
1012 }
1013 
1014 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */
finish_summary_update(struct vdo_waiter * waiter,void * context)1015 static void finish_summary_update(struct vdo_waiter *waiter, void *context)
1016 {
1017 	struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter);
1018 	int result = *((int *) context);
1019 
1020 	slab->active_count--;
1021 
1022 	if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
1023 		vdo_log_error_strerror(result, "failed to update slab summary");
1024 		vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1025 	}
1026 
1027 	check_if_slab_drained(slab);
1028 }
1029 
1030 static void write_reference_block(struct vdo_waiter *waiter, void *context);
1031 
1032 /**
1033  * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring
1034  *                                  a VIO for it from the pool.
1035  * @waiter: The waiter of the block which is starting to write.
1036  * @context: The parent slab of the block.
1037  *
1038  * This can be asynchronous since the writer will have to wait if all VIOs in the pool are
1039  * currently in use.
1040  */
launch_reference_block_write(struct vdo_waiter * waiter,void * context)1041 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context)
1042 {
1043 	struct vdo_slab *slab = context;
1044 
1045 	if (vdo_is_read_only(slab->allocator->depot->vdo))
1046 		return;
1047 
1048 	slab->active_count++;
1049 	container_of(waiter, struct reference_block, waiter)->is_writing = true;
1050 	waiter->callback = write_reference_block;
1051 	acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
1052 }
1053 
save_dirty_reference_blocks(struct vdo_slab * slab)1054 static void save_dirty_reference_blocks(struct vdo_slab *slab)
1055 {
1056 	vdo_waitq_notify_all_waiters(&slab->dirty_blocks,
1057 				     launch_reference_block_write, slab);
1058 	check_if_slab_drained(slab);
1059 }
1060 
1061 /**
1062  * finish_reference_block_write() - After a reference block has written, clean it, release its
1063  *                                  locks, and return its VIO to the pool.
1064  * @completion: The VIO that just finished writing.
1065  */
finish_reference_block_write(struct vdo_completion * completion)1066 static void finish_reference_block_write(struct vdo_completion *completion)
1067 {
1068 	struct vio *vio = as_vio(completion);
1069 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1070 	struct reference_block *block = completion->parent;
1071 	struct vdo_slab *slab = block->slab;
1072 	tail_block_offset_t offset;
1073 
1074 	slab->active_count--;
1075 
1076 	/* Release the slab journal lock. */
1077 	adjust_slab_journal_block_reference(&slab->journal,
1078 					    block->slab_journal_lock_to_release, -1);
1079 	return_vio_to_pool(pooled);
1080 
1081 	/*
1082 	 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause
1083 	 * us to be dirtied again, but we don't want to double enqueue.
1084 	 */
1085 	block->is_writing = false;
1086 
1087 	if (vdo_is_read_only(completion->vdo)) {
1088 		check_if_slab_drained(slab);
1089 		return;
1090 	}
1091 
1092 	/* Re-queue the block if it was re-dirtied while it was writing. */
1093 	if (block->is_dirty) {
1094 		vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1095 		if (vdo_is_state_draining(&slab->state)) {
1096 			/* We must be saving, and this block will otherwise not be relaunched. */
1097 			save_dirty_reference_blocks(slab);
1098 		}
1099 
1100 		return;
1101 	}
1102 
1103 	/*
1104 	 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks
1105 	 * and no summary update in progress.
1106 	 */
1107 	if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) {
1108 		check_if_slab_drained(slab);
1109 		return;
1110 	}
1111 
1112 	offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
1113 	slab->active_count++;
1114 	slab->summary_waiter.callback = finish_summary_update;
1115 	update_slab_summary_entry(slab, &slab->summary_waiter, offset,
1116 				  true, true, slab->free_blocks);
1117 }
1118 
1119 /**
1120  * get_reference_counters_for_block() - Find the reference counters for a given block.
1121  * @block: The reference_block in question.
1122  *
1123  * Return: A pointer to the reference counters for this block.
1124  */
get_reference_counters_for_block(struct reference_block * block)1125 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block)
1126 {
1127 	size_t block_index = block - block->slab->reference_blocks;
1128 
1129 	return &block->slab->counters[block_index * COUNTS_PER_BLOCK];
1130 }
1131 
1132 /**
1133  * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out.
1134  * @block: The block to copy.
1135  * @buffer: The char buffer to fill with the packed block.
1136  */
pack_reference_block(struct reference_block * block,void * buffer)1137 static void pack_reference_block(struct reference_block *block, void *buffer)
1138 {
1139 	struct packed_reference_block *packed = buffer;
1140 	vdo_refcount_t *counters = get_reference_counters_for_block(block);
1141 	sector_count_t i;
1142 	struct packed_journal_point commit_point;
1143 
1144 	vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point);
1145 
1146 	for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
1147 		packed->sectors[i].commit_point = commit_point;
1148 		memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR),
1149 		       (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
1150 	}
1151 }
1152 
write_reference_block_endio(struct bio * bio)1153 static void write_reference_block_endio(struct bio *bio)
1154 {
1155 	struct vio *vio = bio->bi_private;
1156 	struct reference_block *block = vio->completion.parent;
1157 	thread_id_t thread_id = block->slab->allocator->thread_id;
1158 
1159 	continue_vio_after_io(vio, finish_reference_block_write, thread_id);
1160 }
1161 
1162 /**
1163  * handle_io_error() - Handle an I/O error reading or writing a reference count block.
1164  * @completion: The VIO doing the I/O as a completion.
1165  */
handle_io_error(struct vdo_completion * completion)1166 static void handle_io_error(struct vdo_completion *completion)
1167 {
1168 	int result = completion->result;
1169 	struct vio *vio = as_vio(completion);
1170 	struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab;
1171 
1172 	vio_record_metadata_io_error(vio);
1173 	return_vio_to_pool(vio_as_pooled_vio(vio));
1174 	slab->active_count -= vio->io_size / VDO_BLOCK_SIZE;
1175 	vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1176 	check_if_slab_drained(slab);
1177 }
1178 
1179 /**
1180  * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy
1181  *                           its counters and associated data into the VIO, and launch the write.
1182  * @waiter: The waiter of the dirty block.
1183  * @context: The VIO returned by the pool.
1184  */
write_reference_block(struct vdo_waiter * waiter,void * context)1185 static void write_reference_block(struct vdo_waiter *waiter, void *context)
1186 {
1187 	size_t block_offset;
1188 	physical_block_number_t pbn;
1189 	struct pooled_vio *pooled = context;
1190 	struct vdo_completion *completion = &pooled->vio.completion;
1191 	struct reference_block *block = container_of(waiter, struct reference_block,
1192 						     waiter);
1193 
1194 	pack_reference_block(block, pooled->vio.data);
1195 	block_offset = (block - block->slab->reference_blocks);
1196 	pbn = (block->slab->ref_counts_origin + block_offset);
1197 	block->slab_journal_lock_to_release = block->slab_journal_lock;
1198 	completion->parent = block;
1199 
1200 	/*
1201 	 * Mark the block as clean, since we won't be committing any updates that happen after this
1202 	 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not
1203 	 * cause complications.
1204 	 */
1205 	block->is_dirty = false;
1206 
1207 	/*
1208 	 * Flush before writing to ensure that the recovery journal and slab journal entries which
1209 	 * cover this reference update are stable. This prevents data corruption that can be caused
1210 	 * by out of order writes.
1211 	 */
1212 	WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written,
1213 		   block->slab->allocator->ref_counts_statistics.blocks_written + 1);
1214 
1215 	completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id;
1216 	vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio,
1217 				handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH);
1218 }
1219 
reclaim_journal_space(struct slab_journal * journal)1220 static void reclaim_journal_space(struct slab_journal *journal)
1221 {
1222 	block_count_t length = journal_length(journal);
1223 	struct vdo_slab *slab = journal->slab;
1224 	block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks);
1225 	block_count_t written;
1226 
1227 	if ((length < journal->flushing_threshold) || (write_count == 0))
1228 		return;
1229 
1230 	/* The slab journal is over the first threshold, schedule some reference block writes. */
1231 	WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1);
1232 	if (length < journal->flushing_deadline) {
1233 		/* Schedule more writes the closer to the deadline we get. */
1234 		write_count /= journal->flushing_deadline - length + 1;
1235 		write_count = max_t(block_count_t, write_count, 1);
1236 	}
1237 
1238 	for (written = 0; written < write_count; written++) {
1239 		vdo_waitq_notify_next_waiter(&slab->dirty_blocks,
1240 					     launch_reference_block_write, slab);
1241 	}
1242 }
1243 
1244 /**
1245  * reference_count_to_status() - Convert a reference count to a reference status.
1246  * @count: The count to convert.
1247  *
1248  * Return: The appropriate reference status.
1249  */
reference_count_to_status(vdo_refcount_t count)1250 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count)
1251 {
1252 	if (count == EMPTY_REFERENCE_COUNT)
1253 		return RS_FREE;
1254 	else if (count == 1)
1255 		return RS_SINGLE;
1256 	else if (count == PROVISIONAL_REFERENCE_COUNT)
1257 		return RS_PROVISIONAL;
1258 	else
1259 		return RS_SHARED;
1260 }
1261 
1262 /**
1263  * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue
1264  *                 if it wasn't already dirty.
1265  * @block: The reference block to mark as dirty.
1266  */
dirty_block(struct reference_block * block)1267 static void dirty_block(struct reference_block *block)
1268 {
1269 	if (block->is_dirty)
1270 		return;
1271 
1272 	block->is_dirty = true;
1273 	if (!block->is_writing)
1274 		vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1275 }
1276 
1277 /**
1278  * get_reference_block() - Get the reference block that covers the given block index.
1279  * @slab: The slab containing the references.
1280  * @index: The index of the physical block.
1281  */
get_reference_block(struct vdo_slab * slab,slab_block_number index)1282 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab,
1283 								 slab_block_number index)
1284 {
1285 	return &slab->reference_blocks[index / COUNTS_PER_BLOCK];
1286 }
1287 
1288 /**
1289  * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical
1290  *                                block number.
1291  * @slab: The slab.
1292  * @pbn: The physical block number.
1293  * @slab_block_number_ptr: A pointer to the slab block number.
1294  *
1295  * Return: VDO_SUCCESS or an error code.
1296  */
slab_block_number_from_pbn(struct vdo_slab * slab,physical_block_number_t pbn,slab_block_number * slab_block_number_ptr)1297 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab,
1298 						   physical_block_number_t pbn,
1299 						   slab_block_number *slab_block_number_ptr)
1300 {
1301 	u64 slab_block_number;
1302 
1303 	if (pbn < slab->start)
1304 		return VDO_OUT_OF_RANGE;
1305 
1306 	slab_block_number = pbn - slab->start;
1307 	if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks)
1308 		return VDO_OUT_OF_RANGE;
1309 
1310 	*slab_block_number_ptr = slab_block_number;
1311 	return VDO_SUCCESS;
1312 }
1313 
1314 /**
1315  * get_reference_counter() - Get the reference counter that covers the given physical block number.
1316  * @slab: The slab to query.
1317  * @pbn: The physical block number.
1318  * @counter_ptr: A pointer to the reference counter.
1319  */
get_reference_counter(struct vdo_slab * slab,physical_block_number_t pbn,vdo_refcount_t ** counter_ptr)1320 static int __must_check get_reference_counter(struct vdo_slab *slab,
1321 					      physical_block_number_t pbn,
1322 					      vdo_refcount_t **counter_ptr)
1323 {
1324 	slab_block_number index;
1325 	int result = slab_block_number_from_pbn(slab, pbn, &index);
1326 
1327 	if (result != VDO_SUCCESS)
1328 		return result;
1329 
1330 	*counter_ptr = &slab->counters[index];
1331 
1332 	return VDO_SUCCESS;
1333 }
1334 
calculate_slab_priority(struct vdo_slab * slab)1335 static unsigned int calculate_slab_priority(struct vdo_slab *slab)
1336 {
1337 	block_count_t free_blocks = slab->free_blocks;
1338 	unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority;
1339 	unsigned int priority;
1340 
1341 	/*
1342 	 * Wholly full slabs must be the only ones with lowest priority, 0.
1343 	 *
1344 	 * Slabs that have never been opened (empty, newly initialized, and never been written to)
1345 	 * have lower priority than previously opened slabs that have a significant number of free
1346 	 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time
1347 	 * unless there are very few free blocks that have been previously written to.
1348 	 *
1349 	 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO
1350 	 * a better client of any underlying storage that is thinly-provisioned (though discarding
1351 	 * would be better).
1352 	 *
1353 	 * For all other slabs, the priority is derived from the logarithm of the number of free
1354 	 * blocks. Slabs with the same order of magnitude of free blocks have the same priority.
1355 	 * With 2^23 blocks, the priority will range from 1 to 25. The reserved
1356 	 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping.
1357 	 */
1358 
1359 	if (free_blocks == 0)
1360 		return 0;
1361 
1362 	if (is_slab_journal_blank(slab))
1363 		return unopened_slab_priority;
1364 
1365 	priority = (1 + ilog2(free_blocks));
1366 	return ((priority < unopened_slab_priority) ? priority : priority + 1);
1367 }
1368 
1369 /*
1370  * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab
1371  * so slabs with lots of free blocks will be opened for allocation before slabs that have few free
1372  * blocks.
1373  */
prioritize_slab(struct vdo_slab * slab)1374 static void prioritize_slab(struct vdo_slab *slab)
1375 {
1376 	VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
1377 			    "a slab must not already be on a list when prioritizing");
1378 	slab->priority = calculate_slab_priority(slab);
1379 	vdo_priority_table_enqueue(slab->allocator->prioritized_slabs,
1380 				   slab->priority, &slab->allocq_entry);
1381 }
1382 
1383 /**
1384  * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab.
1385  * @slab: The slab.
1386  * @incremented: True if the free block count went up.
1387  */
adjust_free_block_count(struct vdo_slab * slab,bool incremented)1388 static void adjust_free_block_count(struct vdo_slab *slab, bool incremented)
1389 {
1390 	struct block_allocator *allocator = slab->allocator;
1391 
1392 	WRITE_ONCE(allocator->allocated_blocks,
1393 		   allocator->allocated_blocks + (incremented ? -1 : 1));
1394 
1395 	/* The open slab doesn't need to be reprioritized until it is closed. */
1396 	if (slab == allocator->open_slab)
1397 		return;
1398 
1399 	/* Don't bother adjusting the priority table if unneeded. */
1400 	if (slab->priority == calculate_slab_priority(slab))
1401 		return;
1402 
1403 	/*
1404 	 * Reprioritize the slab to reflect the new free block count by removing it from the table
1405 	 * and re-enqueuing it with the new priority.
1406 	 */
1407 	vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry);
1408 	prioritize_slab(slab);
1409 }
1410 
1411 /**
1412  * increment_for_data() - Increment the reference count for a data block.
1413  * @slab: The slab which owns the block.
1414  * @block: The reference block which contains the block being updated.
1415  * @block_number: The block to update.
1416  * @old_status: The reference status of the data block before this increment.
1417  * @lock: The pbn_lock associated with this increment (may be NULL).
1418  * @counter_ptr: A pointer to the count for the data block (in, out).
1419  * @adjust_block_count: Whether to update the allocator's free block count.
1420  *
1421  * Return: VDO_SUCCESS or an error.
1422  */
increment_for_data(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct pbn_lock * lock,vdo_refcount_t * counter_ptr,bool adjust_block_count)1423 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block,
1424 			      slab_block_number block_number,
1425 			      enum reference_status old_status,
1426 			      struct pbn_lock *lock, vdo_refcount_t *counter_ptr,
1427 			      bool adjust_block_count)
1428 {
1429 	switch (old_status) {
1430 	case RS_FREE:
1431 		*counter_ptr = 1;
1432 		block->allocated_count++;
1433 		slab->free_blocks--;
1434 		if (adjust_block_count)
1435 			adjust_free_block_count(slab, false);
1436 
1437 		break;
1438 
1439 	case RS_PROVISIONAL:
1440 		*counter_ptr = 1;
1441 		break;
1442 
1443 	default:
1444 		/* Single or shared */
1445 		if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) {
1446 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1447 						      "Incrementing a block already having 254 references (slab %u, offset %u)",
1448 						      slab->slab_number, block_number);
1449 		}
1450 		(*counter_ptr)++;
1451 	}
1452 
1453 	if (lock != NULL)
1454 		vdo_unassign_pbn_lock_provisional_reference(lock);
1455 	return VDO_SUCCESS;
1456 }
1457 
1458 /**
1459  * decrement_for_data() - Decrement the reference count for a data block.
1460  * @slab: The slab which owns the block.
1461  * @block: The reference block which contains the block being updated.
1462  * @block_number: The block to update.
1463  * @old_status: The reference status of the data block before this decrement.
1464  * @updater: The reference updater doing this operation in case we need to look up the pbn lock.
1465  * @counter_ptr: A pointer to the count for the data block (in, out).
1466  * @adjust_block_count: Whether to update the allocator's free block count.
1467  *
1468  * Return: VDO_SUCCESS or an error.
1469  */
decrement_for_data(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct reference_updater * updater,vdo_refcount_t * counter_ptr,bool adjust_block_count)1470 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block,
1471 			      slab_block_number block_number,
1472 			      enum reference_status old_status,
1473 			      struct reference_updater *updater,
1474 			      vdo_refcount_t *counter_ptr, bool adjust_block_count)
1475 {
1476 	switch (old_status) {
1477 	case RS_FREE:
1478 		return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1479 					      "Decrementing free block at offset %u in slab %u",
1480 					      block_number, slab->slab_number);
1481 
1482 	case RS_PROVISIONAL:
1483 	case RS_SINGLE:
1484 		if (updater->zpbn.zone != NULL) {
1485 			struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone,
1486 									       updater->zpbn.pbn);
1487 
1488 			if (lock != NULL) {
1489 				/*
1490 				 * There is a read lock on this block, so the block must not become
1491 				 * unreferenced.
1492 				 */
1493 				*counter_ptr = PROVISIONAL_REFERENCE_COUNT;
1494 				vdo_assign_pbn_lock_provisional_reference(lock);
1495 				break;
1496 			}
1497 		}
1498 
1499 		*counter_ptr = EMPTY_REFERENCE_COUNT;
1500 		block->allocated_count--;
1501 		slab->free_blocks++;
1502 		if (adjust_block_count)
1503 			adjust_free_block_count(slab, true);
1504 
1505 		break;
1506 
1507 	default:
1508 		/* Shared */
1509 		(*counter_ptr)--;
1510 	}
1511 
1512 	return VDO_SUCCESS;
1513 }
1514 
1515 /**
1516  * increment_for_block_map() - Increment the reference count for a block map page.
1517  * @slab: The slab which owns the block.
1518  * @block: The reference block which contains the block being updated.
1519  * @block_number: The block to update.
1520  * @old_status: The reference status of the block before this increment.
1521  * @lock: The pbn_lock associated with this increment (may be NULL).
1522  * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1523  * @counter_ptr: A pointer to the count for the block (in, out).
1524  * @adjust_block_count: Whether to update the allocator's free block count.
1525  *
1526  * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map
1527  * blocks never dedupe they should never be adjusted from any other state. The adjustment always
1528  * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map
1529  * blocks.
1530  *
1531  * Return: VDO_SUCCESS or an error.
1532  */
increment_for_block_map(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct pbn_lock * lock,bool normal_operation,vdo_refcount_t * counter_ptr,bool adjust_block_count)1533 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block,
1534 				   slab_block_number block_number,
1535 				   enum reference_status old_status,
1536 				   struct pbn_lock *lock, bool normal_operation,
1537 				   vdo_refcount_t *counter_ptr, bool adjust_block_count)
1538 {
1539 	switch (old_status) {
1540 	case RS_FREE:
1541 		if (normal_operation) {
1542 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1543 						      "Incrementing unallocated block map block (slab %u, offset %u)",
1544 						      slab->slab_number, block_number);
1545 		}
1546 
1547 		*counter_ptr = MAXIMUM_REFERENCE_COUNT;
1548 		block->allocated_count++;
1549 		slab->free_blocks--;
1550 		if (adjust_block_count)
1551 			adjust_free_block_count(slab, false);
1552 
1553 		return VDO_SUCCESS;
1554 
1555 	case RS_PROVISIONAL:
1556 		if (!normal_operation)
1557 			return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1558 						      "Block map block had provisional reference during replay (slab %u, offset %u)",
1559 						      slab->slab_number, block_number);
1560 
1561 		*counter_ptr = MAXIMUM_REFERENCE_COUNT;
1562 		if (lock != NULL)
1563 			vdo_unassign_pbn_lock_provisional_reference(lock);
1564 		return VDO_SUCCESS;
1565 
1566 	default:
1567 		return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1568 					      "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)",
1569 					      *counter_ptr, slab->slab_number,
1570 					      block_number);
1571 	}
1572 }
1573 
is_valid_journal_point(const struct journal_point * point)1574 static bool __must_check is_valid_journal_point(const struct journal_point *point)
1575 {
1576 	return ((point != NULL) && (point->sequence_number > 0));
1577 }
1578 
1579 /**
1580  * update_reference_count() - Update the reference count of a block.
1581  * @slab: The slab which owns the block.
1582  * @block: The reference block which contains the block being updated.
1583  * @block_number: The block to update.
1584  * @slab_journal_point: The slab journal point at which this update is journaled.
1585  * @updater: The reference updater.
1586  * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1587  * @adjust_block_count: Whether to update the slab's free block count.
1588  * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement
1589  *                             of a provisional reference.
1590  *
1591  * Return: VDO_SUCCESS or an error.
1592  */
update_reference_count(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,const struct journal_point * slab_journal_point,struct reference_updater * updater,bool normal_operation,bool adjust_block_count,bool * provisional_decrement_ptr)1593 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block,
1594 				  slab_block_number block_number,
1595 				  const struct journal_point *slab_journal_point,
1596 				  struct reference_updater *updater,
1597 				  bool normal_operation, bool adjust_block_count,
1598 				  bool *provisional_decrement_ptr)
1599 {
1600 	vdo_refcount_t *counter_ptr = &slab->counters[block_number];
1601 	enum reference_status old_status = reference_count_to_status(*counter_ptr);
1602 	int result;
1603 
1604 	if (!updater->increment) {
1605 		result = decrement_for_data(slab, block, block_number, old_status,
1606 					    updater, counter_ptr, adjust_block_count);
1607 		if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) {
1608 			if (provisional_decrement_ptr != NULL)
1609 				*provisional_decrement_ptr = true;
1610 			return VDO_SUCCESS;
1611 		}
1612 	} else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) {
1613 		result = increment_for_data(slab, block, block_number, old_status,
1614 					    updater->lock, counter_ptr, adjust_block_count);
1615 	} else {
1616 		result = increment_for_block_map(slab, block, block_number, old_status,
1617 						 updater->lock, normal_operation,
1618 						 counter_ptr, adjust_block_count);
1619 	}
1620 
1621 	if (result != VDO_SUCCESS)
1622 		return result;
1623 
1624 	if (is_valid_journal_point(slab_journal_point))
1625 		slab->slab_journal_point = *slab_journal_point;
1626 
1627 	return VDO_SUCCESS;
1628 }
1629 
adjust_reference_count(struct vdo_slab * slab,struct reference_updater * updater,const struct journal_point * slab_journal_point)1630 static int __must_check adjust_reference_count(struct vdo_slab *slab,
1631 					       struct reference_updater *updater,
1632 					       const struct journal_point *slab_journal_point)
1633 {
1634 	slab_block_number block_number;
1635 	int result;
1636 	struct reference_block *block;
1637 	bool provisional_decrement = false;
1638 
1639 	if (!is_slab_open(slab))
1640 		return VDO_INVALID_ADMIN_STATE;
1641 
1642 	result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number);
1643 	if (result != VDO_SUCCESS)
1644 		return result;
1645 
1646 	block = get_reference_block(slab, block_number);
1647 	result = update_reference_count(slab, block, block_number, slab_journal_point,
1648 					updater, NORMAL_OPERATION, true,
1649 					&provisional_decrement);
1650 	if ((result != VDO_SUCCESS) || provisional_decrement)
1651 		return result;
1652 
1653 	if (block->is_dirty && (block->slab_journal_lock > 0)) {
1654 		sequence_number_t entry_lock = slab_journal_point->sequence_number;
1655 		/*
1656 		 * This block is already dirty and a slab journal entry has been made for it since
1657 		 * the last time it was clean. We must release the per-entry slab journal lock for
1658 		 * the entry associated with the update we are now doing.
1659 		 */
1660 		result = VDO_ASSERT(is_valid_journal_point(slab_journal_point),
1661 				    "Reference count adjustments need slab journal points.");
1662 		if (result != VDO_SUCCESS)
1663 			return result;
1664 
1665 		adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1);
1666 		return VDO_SUCCESS;
1667 	}
1668 
1669 	/*
1670 	 * This may be the first time we are applying an update for which there is a slab journal
1671 	 * entry to this block since the block was cleaned. Therefore, we convert the per-entry
1672 	 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock.
1673 	 */
1674 	if (is_valid_journal_point(slab_journal_point))
1675 		block->slab_journal_lock = slab_journal_point->sequence_number;
1676 	else
1677 		block->slab_journal_lock = 0;
1678 
1679 	dirty_block(block);
1680 	return VDO_SUCCESS;
1681 }
1682 
1683 /**
1684  * add_entry_from_waiter() - Add an entry to the slab journal.
1685  * @waiter: The vio which should make an entry now.
1686  * @context: The slab journal to make an entry in.
1687  *
1688  * This callback is invoked by add_entries() once it has determined that we are ready to make
1689  * another entry in the slab journal. Implements waiter_callback_fn.
1690  */
add_entry_from_waiter(struct vdo_waiter * waiter,void * context)1691 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context)
1692 {
1693 	int result;
1694 	struct reference_updater *updater =
1695 		container_of(waiter, struct reference_updater, waiter);
1696 	struct data_vio *data_vio = data_vio_from_reference_updater(updater);
1697 	struct slab_journal *journal = context;
1698 	struct slab_journal_block_header *header = &journal->tail_header;
1699 	struct journal_point slab_journal_point = {
1700 		.sequence_number = header->sequence_number,
1701 		.entry_count = header->entry_count,
1702 	};
1703 	sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number;
1704 
1705 	if (header->entry_count == 0) {
1706 		/*
1707 		 * This is the first entry in the current tail block, so get a lock on the recovery
1708 		 * journal which we will hold until this tail block is committed.
1709 		 */
1710 		get_lock(journal, header->sequence_number)->recovery_start = recovery_block;
1711 		if (journal->recovery_journal != NULL) {
1712 			zone_count_t zone_number = journal->slab->allocator->zone_number;
1713 
1714 			vdo_acquire_recovery_journal_block_reference(journal->recovery_journal,
1715 								     recovery_block,
1716 								     VDO_ZONE_TYPE_PHYSICAL,
1717 								     zone_number);
1718 		}
1719 
1720 		mark_slab_journal_dirty(journal, recovery_block);
1721 		reclaim_journal_space(journal);
1722 	}
1723 
1724 	add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment,
1725 		  expand_journal_point(data_vio->recovery_journal_point,
1726 				       updater->increment));
1727 
1728 	if (journal->slab->status != VDO_SLAB_REBUILT) {
1729 		/*
1730 		 * If the slab is unrecovered, scrubbing will take care of the count since the
1731 		 * update is now recorded in the journal.
1732 		 */
1733 		adjust_slab_journal_block_reference(journal,
1734 						    slab_journal_point.sequence_number, -1);
1735 		result = VDO_SUCCESS;
1736 	} else {
1737 		/* Now that an entry has been made in the slab journal, update the counter. */
1738 		result = adjust_reference_count(journal->slab, updater,
1739 						&slab_journal_point);
1740 	}
1741 
1742 	if (updater->increment)
1743 		continue_data_vio_with_error(data_vio, result);
1744 	else
1745 		vdo_continue_completion(&data_vio->decrement_completion, result);
1746 }
1747 
1748 /**
1749  * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map
1750  *                                         increment.
1751  * @journal: The journal.
1752  *
1753  * Return: true if the first entry waiter's operation is a block map increment.
1754  */
is_next_entry_a_block_map_increment(struct slab_journal * journal)1755 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal)
1756 {
1757 	struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters);
1758 	struct reference_updater *updater =
1759 		container_of(waiter, struct reference_updater, waiter);
1760 
1761 	return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING);
1762 }
1763 
1764 /**
1765  * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries.
1766  * @journal: The journal to which entries may be added.
1767  *
1768  * By processing the queue in order, we ensure that slab journal entries are made in the same order
1769  * as recovery journal entries for the same increment or decrement.
1770  */
add_entries(struct slab_journal * journal)1771 static void add_entries(struct slab_journal *journal)
1772 {
1773 	if (journal->adding_entries) {
1774 		/* Protect against re-entrancy. */
1775 		return;
1776 	}
1777 
1778 	journal->adding_entries = true;
1779 	while (vdo_waitq_has_waiters(&journal->entry_waiters)) {
1780 		struct slab_journal_block_header *header = &journal->tail_header;
1781 
1782 		if (journal->partial_write_in_progress ||
1783 		    (journal->slab->status == VDO_SLAB_REBUILDING)) {
1784 			/*
1785 			 * Don't add entries while rebuilding or while a partial write is
1786 			 * outstanding, as it could result in reference count corruption.
1787 			 */
1788 			break;
1789 		}
1790 
1791 		if (journal->waiting_to_commit) {
1792 			/*
1793 			 * If we are waiting for resources to write the tail block, and the tail
1794 			 * block is full, we can't make another entry.
1795 			 */
1796 			WRITE_ONCE(journal->events->tail_busy_count,
1797 				   journal->events->tail_busy_count + 1);
1798 			break;
1799 		} else if (is_next_entry_a_block_map_increment(journal) &&
1800 			   (header->entry_count >= journal->full_entries_per_block)) {
1801 			/*
1802 			 * The tail block does not have room for a block map increment, so commit
1803 			 * it now.
1804 			 */
1805 			commit_tail(journal);
1806 			if (journal->waiting_to_commit) {
1807 				WRITE_ONCE(journal->events->tail_busy_count,
1808 					   journal->events->tail_busy_count + 1);
1809 				break;
1810 			}
1811 		}
1812 
1813 		/* If the slab is over the blocking threshold, make the vio wait. */
1814 		if (requires_reaping(journal)) {
1815 			WRITE_ONCE(journal->events->blocked_count,
1816 				   journal->events->blocked_count + 1);
1817 			save_dirty_reference_blocks(journal->slab);
1818 			break;
1819 		}
1820 
1821 		if (header->entry_count == 0) {
1822 			struct journal_lock *lock =
1823 				get_lock(journal, header->sequence_number);
1824 
1825 			/*
1826 			 * Check if the on disk slab journal is full. Because of the blocking and
1827 			 * scrubbing thresholds, this should never happen.
1828 			 */
1829 			if (lock->count > 0) {
1830 				VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
1831 						    "New block has locks, but journal is not full");
1832 
1833 				/*
1834 				 * The blocking threshold must let the journal fill up if the new
1835 				 * block has locks; if the blocking threshold is smaller than the
1836 				 * journal size, the new block cannot possibly have locks already.
1837 				 */
1838 				VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size),
1839 						    "New block can have locks already iff blocking threshold is at the end of the journal");
1840 
1841 				WRITE_ONCE(journal->events->disk_full_count,
1842 					   journal->events->disk_full_count + 1);
1843 				save_dirty_reference_blocks(journal->slab);
1844 				break;
1845 			}
1846 
1847 			/*
1848 			 * Don't allow the new block to be reaped until all of the reference count
1849 			 * blocks are written and the journal block has been fully committed as
1850 			 * well.
1851 			 */
1852 			lock->count = journal->entries_per_block + 1;
1853 
1854 			if (header->sequence_number == 1) {
1855 				struct vdo_slab *slab = journal->slab;
1856 				block_count_t i;
1857 
1858 				/*
1859 				 * This is the first entry in this slab journal, ever. Dirty all of
1860 				 * the reference count blocks. Each will acquire a lock on the tail
1861 				 * block so that the journal won't be reaped until the reference
1862 				 * counts are initialized. The lock acquisition must be done by the
1863 				 * ref_counts since here we don't know how many reference blocks
1864 				 * the ref_counts has.
1865 				 */
1866 				for (i = 0; i < slab->reference_block_count; i++) {
1867 					slab->reference_blocks[i].slab_journal_lock = 1;
1868 					dirty_block(&slab->reference_blocks[i]);
1869 				}
1870 
1871 				adjust_slab_journal_block_reference(journal, 1,
1872 								    slab->reference_block_count);
1873 			}
1874 		}
1875 
1876 		vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1877 					     add_entry_from_waiter, journal);
1878 	}
1879 
1880 	journal->adding_entries = false;
1881 
1882 	/* If there are no waiters, and we are flushing or saving, commit the tail block. */
1883 	if (vdo_is_state_draining(&journal->slab->state) &&
1884 	    !vdo_is_state_suspending(&journal->slab->state) &&
1885 	    !vdo_waitq_has_waiters(&journal->entry_waiters))
1886 		commit_tail(journal);
1887 }
1888 
1889 /**
1890  * reset_search_cursor() - Reset the free block search back to the first reference counter in the
1891  *                         first reference block of a slab.
1892  * @slab: The slab.
1893  */
reset_search_cursor(struct vdo_slab * slab)1894 static void reset_search_cursor(struct vdo_slab *slab)
1895 {
1896 	struct search_cursor *cursor = &slab->search_cursor;
1897 
1898 	cursor->block = cursor->first_block;
1899 	cursor->index = 0;
1900 	cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count);
1901 }
1902 
1903 /**
1904  * advance_search_cursor() - Advance the search cursor to the start of the next reference block in
1905  *                           a slab.
1906  * @slab: The slab.
1907  *
1908  * Wraps around to the first reference block if the current block is the last reference block.
1909  *
1910  * Return: True unless the cursor was at the last reference block.
1911  */
advance_search_cursor(struct vdo_slab * slab)1912 static bool advance_search_cursor(struct vdo_slab *slab)
1913 {
1914 	struct search_cursor *cursor = &slab->search_cursor;
1915 
1916 	/*
1917 	 * If we just finished searching the last reference block, then wrap back around to the
1918 	 * start of the array.
1919 	 */
1920 	if (cursor->block == cursor->last_block) {
1921 		reset_search_cursor(slab);
1922 		return false;
1923 	}
1924 
1925 	/* We're not already at the end, so advance to cursor to the next block. */
1926 	cursor->block++;
1927 	cursor->index = cursor->end_index;
1928 
1929 	if (cursor->block == cursor->last_block) {
1930 		/* The last reference block will usually be a runt. */
1931 		cursor->end_index = slab->block_count;
1932 	} else {
1933 		cursor->end_index += COUNTS_PER_BLOCK;
1934 	}
1935 
1936 	return true;
1937 }
1938 
1939 /**
1940  * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild.
1941  * @depot: The slab depot.
1942  * @pbn: The physical block number to adjust.
1943  * @operation: The type opf operation.
1944  *
1945  * Return: VDO_SUCCESS or an error.
1946  */
vdo_adjust_reference_count_for_rebuild(struct slab_depot * depot,physical_block_number_t pbn,enum journal_operation operation)1947 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot,
1948 					   physical_block_number_t pbn,
1949 					   enum journal_operation operation)
1950 {
1951 	int result;
1952 	slab_block_number block_number;
1953 	struct reference_block *block;
1954 	struct vdo_slab *slab = vdo_get_slab(depot, pbn);
1955 	struct reference_updater updater = {
1956 		.operation = operation,
1957 		.increment = true,
1958 	};
1959 
1960 	result = slab_block_number_from_pbn(slab, pbn, &block_number);
1961 	if (result != VDO_SUCCESS)
1962 		return result;
1963 
1964 	block = get_reference_block(slab, block_number);
1965 	result = update_reference_count(slab, block, block_number, NULL,
1966 					&updater, !NORMAL_OPERATION, false, NULL);
1967 	if (result != VDO_SUCCESS)
1968 		return result;
1969 
1970 	dirty_block(block);
1971 	return VDO_SUCCESS;
1972 }
1973 
1974 /**
1975  * replay_reference_count_change() - Replay the reference count adjustment from a slab journal
1976  *                                   entry into the reference count for a block.
1977  * @slab: The slab.
1978  * @entry_point: The slab journal point for the entry.
1979  * @entry: The slab journal entry being replayed.
1980  *
1981  * The adjustment will be ignored if it was already recorded in the reference count.
1982  *
1983  * Return: VDO_SUCCESS or an error code.
1984  */
replay_reference_count_change(struct vdo_slab * slab,const struct journal_point * entry_point,struct slab_journal_entry entry)1985 static int replay_reference_count_change(struct vdo_slab *slab,
1986 					 const struct journal_point *entry_point,
1987 					 struct slab_journal_entry entry)
1988 {
1989 	int result;
1990 	struct reference_block *block = get_reference_block(slab, entry.sbn);
1991 	sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR;
1992 	struct reference_updater updater = {
1993 		.operation = entry.operation,
1994 		.increment = entry.increment,
1995 	};
1996 
1997 	if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) {
1998 		/* This entry is already reflected in the existing counts, so do nothing. */
1999 		return VDO_SUCCESS;
2000 	}
2001 
2002 	/* This entry is not yet counted in the reference counts. */
2003 	result = update_reference_count(slab, block, entry.sbn, entry_point,
2004 					&updater, !NORMAL_OPERATION, false, NULL);
2005 	if (result != VDO_SUCCESS)
2006 		return result;
2007 
2008 	dirty_block(block);
2009 	return VDO_SUCCESS;
2010 }
2011 
2012 /**
2013  * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of
2014  *                            reference counters.
2015  * @word_ptr: A pointer to the eight counter bytes to check.
2016  * @start_index: The array index corresponding to word_ptr[0].
2017  * @fail_index: The array index to return if no zero byte is found.
2018  *
2019  * The search does no bounds checking; the function relies on the array being sufficiently padded.
2020  *
2021  * Return: The array index of the first zero byte in the word, or the value passed as fail_index if
2022  *         no zero byte was found.
2023  */
find_zero_byte_in_word(const u8 * word_ptr,slab_block_number start_index,slab_block_number fail_index)2024 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr,
2025 						       slab_block_number start_index,
2026 						       slab_block_number fail_index)
2027 {
2028 	u64 word = get_unaligned_le64(word_ptr);
2029 
2030 	/* This looks like a loop, but GCC will unroll the eight iterations for us. */
2031 	unsigned int offset;
2032 
2033 	for (offset = 0; offset < BYTES_PER_WORD; offset++) {
2034 		/* Assumes little-endian byte order, which we have on X86. */
2035 		if ((word & 0xFF) == 0)
2036 			return (start_index + offset);
2037 		word >>= 8;
2038 	}
2039 
2040 	return fail_index;
2041 }
2042 
2043 /**
2044  * find_free_block() - Find the first block with a reference count of zero in the specified
2045  *                     range of reference counter indexes.
2046  * @slab: The slab counters to scan.
2047  * @index_ptr: A pointer to hold the array index of the free block.
2048  *
2049  * Return: True if a free block was found in the specified range.
2050  */
find_free_block(const struct vdo_slab * slab,slab_block_number * index_ptr)2051 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr)
2052 {
2053 	slab_block_number zero_index;
2054 	slab_block_number next_index = slab->search_cursor.index;
2055 	slab_block_number end_index = slab->search_cursor.end_index;
2056 	u8 *next_counter = &slab->counters[next_index];
2057 	u8 *end_counter = &slab->counters[end_index];
2058 
2059 	/*
2060 	 * Search every byte of the first unaligned word. (Array is padded so reading past end is
2061 	 * safe.)
2062 	 */
2063 	zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2064 	if (zero_index < end_index) {
2065 		*index_ptr = zero_index;
2066 		return true;
2067 	}
2068 
2069 	/*
2070 	 * On architectures where unaligned word access is expensive, this would be a good place to
2071 	 * advance to an alignment boundary.
2072 	 */
2073 	next_index += BYTES_PER_WORD;
2074 	next_counter += BYTES_PER_WORD;
2075 
2076 	/*
2077 	 * Now we're word-aligned; check an word at a time until we find a word containing a zero.
2078 	 * (Array is padded so reading past end is safe.)
2079 	 */
2080 	while (next_counter < end_counter) {
2081 		/*
2082 		 * The following code is currently an exact copy of the code preceding the loop,
2083 		 * but if you try to merge them by using a do loop, it runs slower because a jump
2084 		 * instruction gets added at the start of the iteration.
2085 		 */
2086 		zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2087 		if (zero_index < end_index) {
2088 			*index_ptr = zero_index;
2089 			return true;
2090 		}
2091 
2092 		next_index += BYTES_PER_WORD;
2093 		next_counter += BYTES_PER_WORD;
2094 	}
2095 
2096 	return false;
2097 }
2098 
2099 /**
2100  * search_current_reference_block() - Search the reference block currently saved in the search
2101  *                                    cursor for a reference count of zero, starting at the saved
2102  *                                    counter index.
2103  * @slab: The slab to search.
2104  * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2105  *
2106  * Return: True if an unreferenced counter was found.
2107  */
search_current_reference_block(const struct vdo_slab * slab,slab_block_number * free_index_ptr)2108 static bool search_current_reference_block(const struct vdo_slab *slab,
2109 					   slab_block_number *free_index_ptr)
2110 {
2111 	/* Don't bother searching if the current block is known to be full. */
2112 	return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) &&
2113 		find_free_block(slab, free_index_ptr));
2114 }
2115 
2116 /**
2117  * search_reference_blocks() - Search each reference block for a reference count of zero.
2118  * @slab: The slab to search.
2119  * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2120  *
2121  * Searches each reference block for a reference count of zero, starting at the reference block and
2122  * counter index saved in the search cursor and searching up to the end of the last reference
2123  * block. The search does not wrap.
2124  *
2125  * Return: True if an unreferenced counter was found.
2126  */
search_reference_blocks(struct vdo_slab * slab,slab_block_number * free_index_ptr)2127 static bool search_reference_blocks(struct vdo_slab *slab,
2128 				    slab_block_number *free_index_ptr)
2129 {
2130 	/* Start searching at the saved search position in the current block. */
2131 	if (search_current_reference_block(slab, free_index_ptr))
2132 		return true;
2133 
2134 	/* Search each reference block up to the end of the slab. */
2135 	while (advance_search_cursor(slab)) {
2136 		if (search_current_reference_block(slab, free_index_ptr))
2137 			return true;
2138 	}
2139 
2140 	return false;
2141 }
2142 
2143 /**
2144  * make_provisional_reference() - Do the bookkeeping for making a provisional reference.
2145  * @slab: The slab.
2146  * @block_number: The index for the physical block to reference.
2147  */
make_provisional_reference(struct vdo_slab * slab,slab_block_number block_number)2148 static void make_provisional_reference(struct vdo_slab *slab,
2149 				       slab_block_number block_number)
2150 {
2151 	struct reference_block *block = get_reference_block(slab, block_number);
2152 
2153 	/*
2154 	 * Make the initial transition from an unreferenced block to a
2155 	 * provisionally allocated block.
2156 	 */
2157 	slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT;
2158 
2159 	/* Account for the allocation. */
2160 	block->allocated_count++;
2161 	slab->free_blocks--;
2162 }
2163 
2164 /**
2165  * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty.
2166  * @slab: The slab.
2167  */
dirty_all_reference_blocks(struct vdo_slab * slab)2168 static void dirty_all_reference_blocks(struct vdo_slab *slab)
2169 {
2170 	block_count_t i;
2171 
2172 	for (i = 0; i < slab->reference_block_count; i++)
2173 		dirty_block(&slab->reference_blocks[i]);
2174 }
2175 
journal_points_equal(struct journal_point first,struct journal_point second)2176 static inline bool journal_points_equal(struct journal_point first,
2177 					struct journal_point second)
2178 {
2179 	return ((first.sequence_number == second.sequence_number) &&
2180 		(first.entry_count == second.entry_count));
2181 }
2182 
2183 /**
2184  * match_bytes() - Check an 8-byte word for bytes matching the value specified
2185  * @input: A word to examine the bytes of.
2186  * @match: The byte value sought.
2187  *
2188  * Return: 1 in each byte when the corresponding input byte matched, 0 otherwise.
2189  */
match_bytes(u64 input,u8 match)2190 static inline u64 match_bytes(u64 input, u8 match)
2191 {
2192 	u64 temp = input ^ (match * 0x0101010101010101ULL);
2193 	/* top bit of each byte is set iff top bit of temp byte is clear; rest are 0 */
2194 	u64 test_top_bits = ~temp & 0x8080808080808080ULL;
2195 	/* top bit of each byte is set iff low 7 bits of temp byte are clear; rest are useless */
2196 	u64 test_low_bits = 0x8080808080808080ULL - (temp & 0x7f7f7f7f7f7f7f7fULL);
2197 	/* return 1 when both tests indicate temp byte is 0 */
2198 	return (test_top_bits & test_low_bits) >> 7;
2199 }
2200 
2201 /**
2202  * count_valid_references() - Process a newly loaded refcount array
2203  * @counters: The array of counters from a metadata block.
2204  *
2205  * Scan an 8-byte-aligned array of counters, fixing up any provisional values that
2206  * weren't cleaned up at shutdown, changing them internally to zero.
2207  *
2208  * Return: The number of blocks with a non-zero reference count.
2209  */
count_valid_references(vdo_refcount_t * counters)2210 static unsigned int count_valid_references(vdo_refcount_t *counters)
2211 {
2212 	u64 *words = (u64 *)counters;
2213 	/* It's easier to count occurrences of a specific byte than its absences. */
2214 	unsigned int empty_count = 0;
2215 	/* For speed, we process 8 bytes at once. */
2216 	unsigned int words_left = COUNTS_PER_BLOCK / sizeof(u64);
2217 
2218 	/*
2219 	 * Sanity check assumptions used for optimizing this code: Counters are bytes. The counter
2220 	 * array is a multiple of the word size.
2221 	 */
2222 	BUILD_BUG_ON(sizeof(vdo_refcount_t) != 1);
2223 	BUILD_BUG_ON((COUNTS_PER_BLOCK % sizeof(u64)) != 0);
2224 
2225 	while (words_left > 0) {
2226 		/*
2227 		 * This is used effectively as 8 byte-size counters. Byte 0 counts how many words
2228 		 * had the target value found in byte 0, etc. We just have to avoid overflow.
2229 		 */
2230 		u64 split_count = 0;
2231 		/*
2232 		 * The counter "% 255" trick used below to fold split_count into empty_count
2233 		 * imposes a limit of 254 bytes examined each iteration of the outer loop. We
2234 		 * process a word at a time, so that limit gets rounded down to 31 u64 words.
2235 		 */
2236 		const unsigned int max_words_per_iteration = 254 / sizeof(u64);
2237 		unsigned int iter_words_left = min_t(unsigned int, words_left,
2238 						     max_words_per_iteration);
2239 
2240 		words_left -= iter_words_left;
2241 
2242 		while (iter_words_left--) {
2243 			u64 word = *words;
2244 			u64 temp;
2245 
2246 			/* First, if we have any provisional refcount values, clear them. */
2247 			temp = match_bytes(word, PROVISIONAL_REFERENCE_COUNT);
2248 			if (temp) {
2249 				/*
2250 				 * 'temp' has 0x01 bytes where 'word' has PROVISIONAL; this xor
2251 				 * will alter just those bytes, changing PROVISIONAL to EMPTY.
2252 				 */
2253 				word ^= temp * (PROVISIONAL_REFERENCE_COUNT ^ EMPTY_REFERENCE_COUNT);
2254 				*words = word;
2255 			}
2256 
2257 			/* Now count the EMPTY_REFERENCE_COUNT bytes, updating the 8 counters. */
2258 			split_count += match_bytes(word, EMPTY_REFERENCE_COUNT);
2259 			words++;
2260 		}
2261 		empty_count += split_count % 255;
2262 	}
2263 
2264 	return COUNTS_PER_BLOCK - empty_count;
2265 }
2266 
2267 /**
2268  * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure.
2269  * @packed: The written reference block to be unpacked.
2270  * @block: The internal reference block to be loaded.
2271  */
unpack_reference_block(struct packed_reference_block * packed,struct reference_block * block)2272 static void unpack_reference_block(struct packed_reference_block *packed,
2273 				   struct reference_block *block)
2274 {
2275 	sector_count_t i;
2276 	struct vdo_slab *slab = block->slab;
2277 	vdo_refcount_t *counters = get_reference_counters_for_block(block);
2278 
2279 	for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
2280 		struct packed_reference_sector *sector = &packed->sectors[i];
2281 
2282 		vdo_unpack_journal_point(&sector->commit_point, &block->commit_points[i]);
2283 		memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts,
2284 		       (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
2285 		/* The slab_journal_point must be the latest point found in any sector. */
2286 		if (vdo_before_journal_point(&slab->slab_journal_point,
2287 					     &block->commit_points[i]))
2288 			slab->slab_journal_point = block->commit_points[i];
2289 
2290 		if ((i > 0) &&
2291 		    !journal_points_equal(block->commit_points[0],
2292 					  block->commit_points[i])) {
2293 			size_t block_index = block - block->slab->reference_blocks;
2294 
2295 			vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u",
2296 					i, block_index, block->slab->slab_number);
2297 		}
2298 	}
2299 
2300 	block->allocated_count = count_valid_references(counters);
2301 }
2302 
2303 /**
2304  * finish_reference_block_load() - After a reference block has been read, unpack it.
2305  * @completion: The VIO that just finished reading.
2306  */
finish_reference_block_load(struct vdo_completion * completion)2307 static void finish_reference_block_load(struct vdo_completion *completion)
2308 {
2309 	struct vio *vio = as_vio(completion);
2310 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
2311 	struct reference_block *block = completion->parent;
2312 	struct vdo_slab *slab = block->slab;
2313 	unsigned int block_count = vio->io_size / VDO_BLOCK_SIZE;
2314 	unsigned int i;
2315 	char *data = vio->data;
2316 
2317 	for (i = 0; i < block_count; i++, block++, data += VDO_BLOCK_SIZE) {
2318 		struct packed_reference_block *packed = (struct packed_reference_block *) data;
2319 
2320 		unpack_reference_block(packed, block);
2321 		slab->free_blocks -= block->allocated_count;
2322 	}
2323 	return_vio_to_pool(pooled);
2324 	slab->active_count -= block_count;
2325 
2326 	check_if_slab_drained(slab);
2327 }
2328 
load_reference_block_endio(struct bio * bio)2329 static void load_reference_block_endio(struct bio *bio)
2330 {
2331 	struct vio *vio = bio->bi_private;
2332 	struct reference_block *block = vio->completion.parent;
2333 
2334 	continue_vio_after_io(vio, finish_reference_block_load,
2335 			      block->slab->allocator->thread_id);
2336 }
2337 
2338 /**
2339  * load_reference_block_group() - After a block waiter has gotten a VIO from the VIO pool, load
2340  *                                a set of blocks.
2341  * @waiter: The waiter of the first block to load.
2342  * @context: The VIO returned by the pool.
2343  */
load_reference_block_group(struct vdo_waiter * waiter,void * context)2344 static void load_reference_block_group(struct vdo_waiter *waiter, void *context)
2345 {
2346 	struct pooled_vio *pooled = context;
2347 	struct vio *vio = &pooled->vio;
2348 	struct reference_block *block =
2349 		container_of(waiter, struct reference_block, waiter);
2350 	u32 block_offset = block - block->slab->reference_blocks;
2351 	u32 max_block_count = block->slab->reference_block_count - block_offset;
2352 	u32 block_count = min_t(int, vio->block_count, max_block_count);
2353 
2354 	vio->completion.parent = block;
2355 	vdo_submit_metadata_vio_with_size(vio, block->slab->ref_counts_origin + block_offset,
2356 					  load_reference_block_endio, handle_io_error,
2357 					  REQ_OP_READ, block_count * VDO_BLOCK_SIZE);
2358 }
2359 
2360 /**
2361  * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a
2362  *                           pre-allocated reference counter.
2363  * @slab: The slab.
2364  */
load_reference_blocks(struct vdo_slab * slab)2365 static void load_reference_blocks(struct vdo_slab *slab)
2366 {
2367 	block_count_t i;
2368 	u64 blocks_per_vio = slab->allocator->refcount_blocks_per_big_vio;
2369 	struct vio_pool *pool = slab->allocator->refcount_big_vio_pool;
2370 
2371 	if (!pool) {
2372 		pool = slab->allocator->vio_pool;
2373 		blocks_per_vio = 1;
2374 	}
2375 
2376 	slab->free_blocks = slab->block_count;
2377 	slab->active_count = slab->reference_block_count;
2378 	for (i = 0; i < slab->reference_block_count; i += blocks_per_vio) {
2379 		struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter;
2380 
2381 		waiter->callback = load_reference_block_group;
2382 		acquire_vio_from_pool(pool, waiter);
2383 	}
2384 }
2385 
2386 /**
2387  * drain_slab() - Drain all reference count I/O.
2388  * @slab: The slab.
2389  *
2390  * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the
2391  * reference blocks may be loaded from disk or dirty reference blocks may be written out.
2392  */
drain_slab(struct vdo_slab * slab)2393 static void drain_slab(struct vdo_slab *slab)
2394 {
2395 	bool save;
2396 	bool load;
2397 	const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state);
2398 
2399 	if (state == VDO_ADMIN_STATE_SUSPENDING)
2400 		return;
2401 
2402 	if ((state != VDO_ADMIN_STATE_REBUILDING) &&
2403 	    (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING))
2404 		commit_tail(&slab->journal);
2405 
2406 	if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL))
2407 		return;
2408 
2409 	save = false;
2410 	load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts;
2411 	if (state == VDO_ADMIN_STATE_SCRUBBING) {
2412 		if (load) {
2413 			load_reference_blocks(slab);
2414 			return;
2415 		}
2416 	} else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) {
2417 		if (!load) {
2418 			/* These reference counts were never written, so mark them all dirty. */
2419 			dirty_all_reference_blocks(slab);
2420 		}
2421 		save = true;
2422 	} else if (state == VDO_ADMIN_STATE_REBUILDING) {
2423 		/*
2424 		 * Write out the counters if the slab has written them before, or it has any
2425 		 * non-zero reference counts, or there are any slab journal blocks.
2426 		 */
2427 		block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks;
2428 
2429 		if (load || (slab->free_blocks != data_blocks) ||
2430 		    !is_slab_journal_blank(slab)) {
2431 			dirty_all_reference_blocks(slab);
2432 			save = true;
2433 		}
2434 	} else if (state == VDO_ADMIN_STATE_SAVING) {
2435 		save = (slab->status == VDO_SLAB_REBUILT);
2436 	} else {
2437 		vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS);
2438 		return;
2439 	}
2440 
2441 	if (save)
2442 		save_dirty_reference_blocks(slab);
2443 }
2444 
allocate_slab_counters(struct vdo_slab * slab)2445 static int allocate_slab_counters(struct vdo_slab *slab)
2446 {
2447 	int result;
2448 	size_t index, bytes;
2449 
2450 	result = VDO_ASSERT(slab->reference_blocks == NULL,
2451 			    "vdo_slab %u doesn't allocate refcounts twice",
2452 			    slab->slab_number);
2453 	if (result != VDO_SUCCESS)
2454 		return result;
2455 
2456 	result = vdo_allocate(slab->reference_block_count, struct reference_block,
2457 			      __func__, &slab->reference_blocks);
2458 	if (result != VDO_SUCCESS)
2459 		return result;
2460 
2461 	/*
2462 	 * Allocate such that the runt slab has a full-length memory array, plus a little padding
2463 	 * so we can word-search even at the very end.
2464 	 */
2465 	bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD);
2466 	result = vdo_allocate(bytes, vdo_refcount_t, "ref counts array",
2467 			      &slab->counters);
2468 	if (result != VDO_SUCCESS) {
2469 		vdo_free(vdo_forget(slab->reference_blocks));
2470 		return result;
2471 	}
2472 
2473 	slab->search_cursor.first_block = slab->reference_blocks;
2474 	slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1];
2475 	reset_search_cursor(slab);
2476 
2477 	for (index = 0; index < slab->reference_block_count; index++) {
2478 		slab->reference_blocks[index] = (struct reference_block) {
2479 			.slab = slab,
2480 		};
2481 	}
2482 
2483 	return VDO_SUCCESS;
2484 }
2485 
allocate_counters_if_clean(struct vdo_slab * slab)2486 static int allocate_counters_if_clean(struct vdo_slab *slab)
2487 {
2488 	if (vdo_is_state_clean_load(&slab->state))
2489 		return allocate_slab_counters(slab);
2490 
2491 	return VDO_SUCCESS;
2492 }
2493 
finish_loading_journal(struct vdo_completion * completion)2494 static void finish_loading_journal(struct vdo_completion *completion)
2495 {
2496 	struct vio *vio = as_vio(completion);
2497 	struct slab_journal *journal = completion->parent;
2498 	struct vdo_slab *slab = journal->slab;
2499 	struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data;
2500 	struct slab_journal_block_header header;
2501 
2502 	vdo_unpack_slab_journal_block_header(&block->header, &header);
2503 
2504 	/* FIXME: should it be an error if the following conditional fails? */
2505 	if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) &&
2506 	    (header.nonce == slab->allocator->nonce)) {
2507 		journal->tail = header.sequence_number + 1;
2508 
2509 		/*
2510 		 * If the slab is clean, this implies the slab journal is empty, so advance the
2511 		 * head appropriately.
2512 		 */
2513 		journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ?
2514 				 header.head : journal->tail);
2515 		journal->tail_header = header;
2516 		initialize_journal_state(journal);
2517 	}
2518 
2519 	return_vio_to_pool(vio_as_pooled_vio(vio));
2520 	vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab));
2521 }
2522 
read_slab_journal_tail_endio(struct bio * bio)2523 static void read_slab_journal_tail_endio(struct bio *bio)
2524 {
2525 	struct vio *vio = bio->bi_private;
2526 	struct slab_journal *journal = vio->completion.parent;
2527 
2528 	continue_vio_after_io(vio, finish_loading_journal,
2529 			      journal->slab->allocator->thread_id);
2530 }
2531 
handle_load_error(struct vdo_completion * completion)2532 static void handle_load_error(struct vdo_completion *completion)
2533 {
2534 	int result = completion->result;
2535 	struct slab_journal *journal = completion->parent;
2536 	struct vio *vio = as_vio(completion);
2537 
2538 	vio_record_metadata_io_error(vio);
2539 	return_vio_to_pool(vio_as_pooled_vio(vio));
2540 	vdo_finish_loading_with_result(&journal->slab->state, result);
2541 }
2542 
2543 /**
2544  * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio
2545  *                            pool.
2546  * @waiter: The vio pool waiter which has just been notified.
2547  * @context: The vio pool entry given to the waiter.
2548  *
2549  * This is the success callback from acquire_vio_from_pool() when loading a slab journal.
2550  */
read_slab_journal_tail(struct vdo_waiter * waiter,void * context)2551 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context)
2552 {
2553 	struct slab_journal *journal =
2554 		container_of(waiter, struct slab_journal, resource_waiter);
2555 	struct vdo_slab *slab = journal->slab;
2556 	struct pooled_vio *pooled = context;
2557 	struct vio *vio = &pooled->vio;
2558 	tail_block_offset_t last_commit_point =
2559 		slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2560 
2561 	/*
2562 	 * Slab summary keeps the commit point offset, so the tail block is the block before that.
2563 	 * Calculation supports small journals in unit tests.
2564 	 */
2565 	tail_block_offset_t tail_block = ((last_commit_point == 0) ?
2566 					  (tail_block_offset_t)(journal->size - 1) :
2567 					  (last_commit_point - 1));
2568 
2569 	vio->completion.parent = journal;
2570 	vio->completion.callback_thread_id = slab->allocator->thread_id;
2571 	vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block,
2572 				read_slab_journal_tail_endio, handle_load_error,
2573 				REQ_OP_READ);
2574 }
2575 
2576 /**
2577  * load_slab_journal() - Load a slab's journal by reading the journal's tail.
2578  * @slab: The slab.
2579  */
load_slab_journal(struct vdo_slab * slab)2580 static void load_slab_journal(struct vdo_slab *slab)
2581 {
2582 	struct slab_journal *journal = &slab->journal;
2583 	tail_block_offset_t last_commit_point;
2584 
2585 	last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2586 	if ((last_commit_point == 0) &&
2587 	    !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) {
2588 		/*
2589 		 * This slab claims that it has a tail block at (journal->size - 1), but a head of
2590 		 * 1. This is impossible, due to the scrubbing threshold, on a real system, so
2591 		 * don't bother reading the (bogus) data off disk.
2592 		 */
2593 		VDO_ASSERT_LOG_ONLY(((journal->size < 16) ||
2594 				     (journal->scrubbing_threshold < (journal->size - 1))),
2595 				    "Scrubbing threshold protects against reads of unwritten slab journal blocks");
2596 		vdo_finish_loading_with_result(&slab->state,
2597 					       allocate_counters_if_clean(slab));
2598 		return;
2599 	}
2600 
2601 	journal->resource_waiter.callback = read_slab_journal_tail;
2602 	acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter);
2603 }
2604 
register_slab_for_scrubbing(struct vdo_slab * slab,bool high_priority)2605 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority)
2606 {
2607 	struct slab_scrubber *scrubber = &slab->allocator->scrubber;
2608 
2609 	VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT),
2610 			    "slab to be scrubbed is unrecovered");
2611 
2612 	if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING)
2613 		return;
2614 
2615 	list_del_init(&slab->allocq_entry);
2616 	if (!slab->was_queued_for_scrubbing) {
2617 		WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1);
2618 		slab->was_queued_for_scrubbing = true;
2619 	}
2620 
2621 	if (high_priority) {
2622 		slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
2623 		list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs);
2624 		return;
2625 	}
2626 
2627 	list_add_tail(&slab->allocq_entry, &scrubber->slabs);
2628 }
2629 
2630 /* Queue a slab for allocation or scrubbing. */
queue_slab(struct vdo_slab * slab)2631 static void queue_slab(struct vdo_slab *slab)
2632 {
2633 	struct block_allocator *allocator = slab->allocator;
2634 	block_count_t free_blocks;
2635 	int result;
2636 
2637 	VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
2638 			"a requeued slab must not already be on a list");
2639 
2640 	if (vdo_is_read_only(allocator->depot->vdo))
2641 		return;
2642 
2643 	free_blocks = slab->free_blocks;
2644 	result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks),
2645 			    "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)",
2646 			    slab->slab_number, (unsigned long long) free_blocks,
2647 			    (unsigned long long) allocator->depot->slab_config.data_blocks);
2648 	if (result != VDO_SUCCESS) {
2649 		vdo_enter_read_only_mode(allocator->depot->vdo, result);
2650 		return;
2651 	}
2652 
2653 	if (slab->status != VDO_SLAB_REBUILT) {
2654 		register_slab_for_scrubbing(slab, false);
2655 		return;
2656 	}
2657 
2658 	if (!vdo_is_state_resuming(&slab->state)) {
2659 		/*
2660 		 * If the slab is resuming, we've already accounted for it here, so don't do it
2661 		 * again.
2662 		 * FIXME: under what situation would the slab be resuming here?
2663 		 */
2664 		WRITE_ONCE(allocator->allocated_blocks,
2665 			   allocator->allocated_blocks - free_blocks);
2666 		if (!is_slab_journal_blank(slab)) {
2667 			WRITE_ONCE(allocator->statistics.slabs_opened,
2668 				   allocator->statistics.slabs_opened + 1);
2669 		}
2670 	}
2671 
2672 	if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING)
2673 		reopen_slab_journal(slab);
2674 
2675 	prioritize_slab(slab);
2676 }
2677 
2678 /** Implements vdo_admin_initiator_fn. */
initiate_slab_action(struct admin_state * state)2679 static void initiate_slab_action(struct admin_state *state)
2680 {
2681 	struct vdo_slab *slab = container_of(state, struct vdo_slab, state);
2682 
2683 	if (vdo_is_state_draining(state)) {
2684 		const struct admin_state_code *operation = vdo_get_admin_state_code(state);
2685 
2686 		if (operation == VDO_ADMIN_STATE_SCRUBBING)
2687 			slab->status = VDO_SLAB_REBUILDING;
2688 
2689 		drain_slab(slab);
2690 		check_if_slab_drained(slab);
2691 		return;
2692 	}
2693 
2694 	if (vdo_is_state_loading(state)) {
2695 		load_slab_journal(slab);
2696 		return;
2697 	}
2698 
2699 	if (vdo_is_state_resuming(state)) {
2700 		queue_slab(slab);
2701 		vdo_finish_resuming(state);
2702 		return;
2703 	}
2704 
2705 	vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE);
2706 }
2707 
2708 /**
2709  * get_next_slab() - Get the next slab to scrub.
2710  * @scrubber: The slab scrubber.
2711  *
2712  * Return: The next slab to scrub or NULL if there are none.
2713  */
get_next_slab(struct slab_scrubber * scrubber)2714 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber)
2715 {
2716 	struct vdo_slab *slab;
2717 
2718 	slab = list_first_entry_or_null(&scrubber->high_priority_slabs,
2719 					struct vdo_slab, allocq_entry);
2720 	if (slab != NULL)
2721 		return slab;
2722 
2723 	return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab,
2724 					allocq_entry);
2725 }
2726 
2727 /**
2728  * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub.
2729  * @scrubber: The scrubber to check.
2730  *
2731  * Return: True if the scrubber has slabs to scrub.
2732  */
has_slabs_to_scrub(struct slab_scrubber * scrubber)2733 static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber)
2734 {
2735 	return (get_next_slab(scrubber) != NULL);
2736 }
2737 
2738 /**
2739  * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio.
2740  * @scrubber: The scrubber.
2741  */
uninitialize_scrubber_vio(struct slab_scrubber * scrubber)2742 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber)
2743 {
2744 	vdo_free(vdo_forget(scrubber->vio.data));
2745 	free_vio_components(&scrubber->vio);
2746 }
2747 
2748 /**
2749  * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because
2750  *                      there's been an error.
2751  * @scrubber: The scrubber.
2752  * @result: The result of the scrubbing operation.
2753  */
finish_scrubbing(struct slab_scrubber * scrubber,int result)2754 static void finish_scrubbing(struct slab_scrubber *scrubber, int result)
2755 {
2756 	bool notify = vdo_waitq_has_waiters(&scrubber->waiters);
2757 	bool done = !has_slabs_to_scrub(scrubber);
2758 	struct block_allocator *allocator =
2759 		container_of(scrubber, struct block_allocator, scrubber);
2760 
2761 	if (done)
2762 		uninitialize_scrubber_vio(scrubber);
2763 
2764 	if (scrubber->high_priority_only) {
2765 		scrubber->high_priority_only = false;
2766 		vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result);
2767 	} else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) {
2768 		/* All of our slabs were scrubbed, and we're the last allocator to finish. */
2769 		enum vdo_state prior_state =
2770 			atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING,
2771 				       VDO_DIRTY);
2772 
2773 		/*
2774 		 * To be safe, even if the CAS failed, ensure anything that follows is ordered with
2775 		 * respect to whatever state change did happen.
2776 		 */
2777 		smp_mb__after_atomic();
2778 
2779 		/*
2780 		 * We must check the VDO state here and not the depot's read_only_notifier since
2781 		 * the compare-swap-above could have failed due to a read-only entry which our own
2782 		 * thread does not yet know about.
2783 		 */
2784 		if (prior_state == VDO_DIRTY)
2785 			vdo_log_info("VDO commencing normal operation");
2786 		else if (prior_state == VDO_RECOVERING)
2787 			vdo_log_info("Exiting recovery mode");
2788 		free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool));
2789 	}
2790 
2791 	/*
2792 	 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to
2793 	 * happen.
2794 	 */
2795 	if (!vdo_finish_draining(&scrubber->admin_state))
2796 		WRITE_ONCE(scrubber->admin_state.current_state,
2797 			   VDO_ADMIN_STATE_SUSPENDED);
2798 
2799 	/*
2800 	 * We can't notify waiters until after we've finished draining or they'll just requeue.
2801 	 * Fortunately if there were waiters, we can't have been freed yet.
2802 	 */
2803 	if (notify)
2804 		vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2805 }
2806 
2807 static void scrub_next_slab(struct slab_scrubber *scrubber);
2808 
2809 /**
2810  * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed.
2811  * @completion: The slab rebuild completion.
2812  *
2813  * This callback is registered in apply_journal_entries().
2814  */
slab_scrubbed(struct vdo_completion * completion)2815 static void slab_scrubbed(struct vdo_completion *completion)
2816 {
2817 	struct slab_scrubber *scrubber =
2818 		container_of(as_vio(completion), struct slab_scrubber, vio);
2819 	struct vdo_slab *slab = scrubber->slab;
2820 
2821 	slab->status = VDO_SLAB_REBUILT;
2822 	queue_slab(slab);
2823 	reopen_slab_journal(slab);
2824 	WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1);
2825 	scrub_next_slab(scrubber);
2826 }
2827 
2828 /**
2829  * abort_scrubbing() - Abort scrubbing due to an error.
2830  * @scrubber: The slab scrubber.
2831  * @result: The error.
2832  */
abort_scrubbing(struct slab_scrubber * scrubber,int result)2833 static void abort_scrubbing(struct slab_scrubber *scrubber, int result)
2834 {
2835 	vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result);
2836 	finish_scrubbing(scrubber, result);
2837 }
2838 
2839 /**
2840  * handle_scrubber_error() - Handle errors while rebuilding a slab.
2841  * @completion: The slab rebuild completion.
2842  */
handle_scrubber_error(struct vdo_completion * completion)2843 static void handle_scrubber_error(struct vdo_completion *completion)
2844 {
2845 	struct vio *vio = as_vio(completion);
2846 
2847 	vio_record_metadata_io_error(vio);
2848 	abort_scrubbing(container_of(vio, struct slab_scrubber, vio),
2849 			completion->result);
2850 }
2851 
2852 /**
2853  * apply_block_entries() - Apply all the entries in a block to the reference counts.
2854  * @block: A block with entries to apply.
2855  * @entry_count: The number of entries to apply.
2856  * @block_number: The sequence number of the block.
2857  * @slab: The slab to apply the entries to.
2858  *
2859  * Return: VDO_SUCCESS or an error code.
2860  */
apply_block_entries(struct packed_slab_journal_block * block,journal_entry_count_t entry_count,sequence_number_t block_number,struct vdo_slab * slab)2861 static int apply_block_entries(struct packed_slab_journal_block *block,
2862 			       journal_entry_count_t entry_count,
2863 			       sequence_number_t block_number, struct vdo_slab *slab)
2864 {
2865 	struct journal_point entry_point = {
2866 		.sequence_number = block_number,
2867 		.entry_count = 0,
2868 	};
2869 	int result;
2870 	slab_block_number max_sbn = slab->end - slab->start;
2871 
2872 	while (entry_point.entry_count < entry_count) {
2873 		struct slab_journal_entry entry =
2874 			vdo_decode_slab_journal_entry(block, entry_point.entry_count);
2875 
2876 		if (entry.sbn > max_sbn) {
2877 			/* This entry is out of bounds. */
2878 			return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
2879 						      "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)",
2880 						      (unsigned long long) block_number,
2881 						      entry_point.entry_count,
2882 						      entry.sbn, max_sbn);
2883 		}
2884 
2885 		result = replay_reference_count_change(slab, &entry_point, entry);
2886 		if (result != VDO_SUCCESS) {
2887 			vdo_log_error_strerror(result,
2888 					       "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u",
2889 					       (unsigned long long) block_number,
2890 					       entry_point.entry_count,
2891 					       vdo_get_journal_operation_name(entry.operation),
2892 					       entry.sbn, slab->slab_number);
2893 			return result;
2894 		}
2895 		entry_point.entry_count++;
2896 	}
2897 
2898 	return VDO_SUCCESS;
2899 }
2900 
2901 /**
2902  * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries.
2903  * @completion: The metadata read vio completion.
2904  *
2905  * This is a callback registered in start_scrubbing().
2906  */
apply_journal_entries(struct vdo_completion * completion)2907 static void apply_journal_entries(struct vdo_completion *completion)
2908 {
2909 	int result;
2910 	struct slab_scrubber *scrubber =
2911 		container_of(as_vio(completion), struct slab_scrubber, vio);
2912 	struct vdo_slab *slab = scrubber->slab;
2913 	struct slab_journal *journal = &slab->journal;
2914 
2915 	/* Find the boundaries of the useful part of the journal. */
2916 	sequence_number_t tail = journal->tail;
2917 	tail_block_offset_t end_index = (tail - 1) % journal->size;
2918 	char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE);
2919 	struct packed_slab_journal_block *end_block =
2920 		(struct packed_slab_journal_block *) end_data;
2921 
2922 	sequence_number_t head = __le64_to_cpu(end_block->header.head);
2923 	tail_block_offset_t head_index = head % journal->size;
2924 	block_count_t index = head_index;
2925 
2926 	struct journal_point ref_counts_point = slab->slab_journal_point;
2927 	struct journal_point last_entry_applied = ref_counts_point;
2928 	sequence_number_t sequence;
2929 
2930 	for (sequence = head; sequence < tail; sequence++) {
2931 		char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE);
2932 		struct packed_slab_journal_block *block =
2933 			(struct packed_slab_journal_block *) block_data;
2934 		struct slab_journal_block_header header;
2935 
2936 		vdo_unpack_slab_journal_block_header(&block->header, &header);
2937 
2938 		if ((header.nonce != slab->allocator->nonce) ||
2939 		    (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) ||
2940 		    (header.sequence_number != sequence) ||
2941 		    (header.entry_count > journal->entries_per_block) ||
2942 		    (header.has_block_map_increments &&
2943 		     (header.entry_count > journal->full_entries_per_block))) {
2944 			/* The block is not what we expect it to be. */
2945 			vdo_log_error("vdo_slab journal block for slab %u was invalid",
2946 				      slab->slab_number);
2947 			abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL);
2948 			return;
2949 		}
2950 
2951 		result = apply_block_entries(block, header.entry_count, sequence, slab);
2952 		if (result != VDO_SUCCESS) {
2953 			abort_scrubbing(scrubber, result);
2954 			return;
2955 		}
2956 
2957 		last_entry_applied.sequence_number = sequence;
2958 		last_entry_applied.entry_count = header.entry_count - 1;
2959 		index++;
2960 		if (index == journal->size)
2961 			index = 0;
2962 	}
2963 
2964 	/*
2965 	 * At the end of rebuild, the reference counters should be accurate to the end of the
2966 	 * journal we just applied.
2967 	 */
2968 	result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied,
2969 						      &ref_counts_point),
2970 			    "Refcounts are not more accurate than the slab journal");
2971 	if (result != VDO_SUCCESS) {
2972 		abort_scrubbing(scrubber, result);
2973 		return;
2974 	}
2975 
2976 	/* Save out the rebuilt reference blocks. */
2977 	vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error,
2978 			       slab->allocator->thread_id, completion->parent);
2979 	vdo_start_operation_with_waiter(&slab->state,
2980 					VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING,
2981 					completion, initiate_slab_action);
2982 }
2983 
read_slab_journal_endio(struct bio * bio)2984 static void read_slab_journal_endio(struct bio *bio)
2985 {
2986 	struct vio *vio = bio->bi_private;
2987 	struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio);
2988 
2989 	continue_vio_after_io(bio->bi_private, apply_journal_entries,
2990 			      scrubber->slab->allocator->thread_id);
2991 }
2992 
2993 /**
2994  * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed.
2995  * @completion: The scrubber's vio completion.
2996  *
2997  * This callback is registered in scrub_next_slab().
2998  */
start_scrubbing(struct vdo_completion * completion)2999 static void start_scrubbing(struct vdo_completion *completion)
3000 {
3001 	struct slab_scrubber *scrubber =
3002 		container_of(as_vio(completion), struct slab_scrubber, vio);
3003 	struct vdo_slab *slab = scrubber->slab;
3004 
3005 	if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) {
3006 		slab_scrubbed(completion);
3007 		return;
3008 	}
3009 
3010 	vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin,
3011 				read_slab_journal_endio, handle_scrubber_error,
3012 				REQ_OP_READ);
3013 }
3014 
3015 /**
3016  * scrub_next_slab() - Scrub the next slab if there is one.
3017  * @scrubber: The scrubber.
3018  */
scrub_next_slab(struct slab_scrubber * scrubber)3019 static void scrub_next_slab(struct slab_scrubber *scrubber)
3020 {
3021 	struct vdo_completion *completion = &scrubber->vio.completion;
3022 	struct vdo_slab *slab;
3023 
3024 	/*
3025 	 * Note: this notify call is always safe only because scrubbing can only be started when
3026 	 * the VDO is quiescent.
3027 	 */
3028 	vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
3029 
3030 	if (vdo_is_read_only(completion->vdo)) {
3031 		finish_scrubbing(scrubber, VDO_READ_ONLY);
3032 		return;
3033 	}
3034 
3035 	slab = get_next_slab(scrubber);
3036 	if ((slab == NULL) ||
3037 	    (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) {
3038 		finish_scrubbing(scrubber, VDO_SUCCESS);
3039 		return;
3040 	}
3041 
3042 	if (vdo_finish_draining(&scrubber->admin_state))
3043 		return;
3044 
3045 	list_del_init(&slab->allocq_entry);
3046 	scrubber->slab = slab;
3047 	vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error,
3048 			       slab->allocator->thread_id, completion->parent);
3049 	vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING,
3050 					completion, initiate_slab_action);
3051 }
3052 
3053 /**
3054  * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing.
3055  * @allocator: The block_allocator to scrub.
3056  * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL.
3057  */
scrub_slabs(struct block_allocator * allocator,struct vdo_completion * parent)3058 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent)
3059 {
3060 	struct slab_scrubber *scrubber = &allocator->scrubber;
3061 
3062 	scrubber->vio.completion.parent = parent;
3063 	scrubber->high_priority_only = (parent != NULL);
3064 	if (!has_slabs_to_scrub(scrubber)) {
3065 		finish_scrubbing(scrubber, VDO_SUCCESS);
3066 		return;
3067 	}
3068 
3069 	if (scrubber->high_priority_only &&
3070 	    vdo_is_priority_table_empty(allocator->prioritized_slabs) &&
3071 	    list_empty(&scrubber->high_priority_slabs))
3072 		register_slab_for_scrubbing(get_next_slab(scrubber), true);
3073 
3074 	vdo_resume_if_quiescent(&scrubber->admin_state);
3075 	scrub_next_slab(scrubber);
3076 }
3077 
assert_on_allocator_thread(thread_id_t thread_id,const char * function_name)3078 static inline void assert_on_allocator_thread(thread_id_t thread_id,
3079 					      const char *function_name)
3080 {
3081 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id),
3082 			    "%s called on correct thread", function_name);
3083 }
3084 
register_slab_with_allocator(struct block_allocator * allocator,struct vdo_slab * slab)3085 static void register_slab_with_allocator(struct block_allocator *allocator,
3086 					 struct vdo_slab *slab)
3087 {
3088 	allocator->slab_count++;
3089 	allocator->last_slab = slab->slab_number;
3090 }
3091 
3092 /**
3093  * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot.
3094  * @depot: The depot over which to iterate.
3095  * @start: The number of the slab to start iterating from.
3096  * @end: The number of the last slab which may be returned.
3097  * @stride: The difference in slab number between successive slabs.
3098  *
3099  * Iteration always occurs from higher to lower numbered slabs.
3100  *
3101  * Return: An initialized iterator structure.
3102  */
get_depot_slab_iterator(struct slab_depot * depot,slab_count_t start,slab_count_t end,slab_count_t stride)3103 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot,
3104 						    slab_count_t start, slab_count_t end,
3105 						    slab_count_t stride)
3106 {
3107 	struct vdo_slab **slabs = depot->slabs;
3108 
3109 	return (struct slab_iterator) {
3110 		.slabs = slabs,
3111 		.next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]),
3112 		.end = end,
3113 		.stride = stride,
3114 	};
3115 }
3116 
get_slab_iterator(const struct block_allocator * allocator)3117 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator)
3118 {
3119 	return get_depot_slab_iterator(allocator->depot, allocator->last_slab,
3120 				       allocator->zone_number,
3121 				       allocator->depot->zone_count);
3122 }
3123 
3124 /**
3125  * next_slab() - Get the next slab from a slab_iterator and advance the iterator
3126  * @iterator: The slab_iterator.
3127  *
3128  * Return: The next slab or NULL if the iterator is exhausted.
3129  */
next_slab(struct slab_iterator * iterator)3130 static struct vdo_slab *next_slab(struct slab_iterator *iterator)
3131 {
3132 	struct vdo_slab *slab = iterator->next;
3133 
3134 	if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride))
3135 		iterator->next = NULL;
3136 	else
3137 		iterator->next = iterator->slabs[slab->slab_number - iterator->stride];
3138 
3139 	return slab;
3140 }
3141 
3142 /**
3143  * abort_waiter() - Abort vios waiting to make journal entries when read-only.
3144  * @waiter: A waiting data_vio.
3145  * @context: Not used.
3146  *
3147  * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone
3148  * into read-only mode. Implements waiter_callback_fn.
3149  */
abort_waiter(struct vdo_waiter * waiter,void __always_unused * context)3150 static void abort_waiter(struct vdo_waiter *waiter, void __always_unused *context)
3151 {
3152 	struct reference_updater *updater =
3153 		container_of(waiter, struct reference_updater, waiter);
3154 	struct data_vio *data_vio = data_vio_from_reference_updater(updater);
3155 
3156 	if (updater->increment) {
3157 		continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
3158 		return;
3159 	}
3160 
3161 	vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY);
3162 }
3163 
3164 /* Implements vdo_read_only_notification_fn. */
notify_block_allocator_of_read_only_mode(void * listener,struct vdo_completion * parent)3165 static void notify_block_allocator_of_read_only_mode(void *listener,
3166 						     struct vdo_completion *parent)
3167 {
3168 	struct block_allocator *allocator = listener;
3169 	struct slab_iterator iterator;
3170 
3171 	assert_on_allocator_thread(allocator->thread_id, __func__);
3172 	iterator = get_slab_iterator(allocator);
3173 	while (iterator.next != NULL) {
3174 		struct vdo_slab *slab = next_slab(&iterator);
3175 
3176 		vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters,
3177 					     abort_waiter, &slab->journal);
3178 		check_if_slab_drained(slab);
3179 	}
3180 
3181 	vdo_finish_completion(parent);
3182 }
3183 
3184 /**
3185  * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if
3186  *                                       the block it locks is unreferenced.
3187  * @slab: The slab which contains the block.
3188  * @pbn: The physical block to reference.
3189  * @lock: The lock.
3190  *
3191  * Return: VDO_SUCCESS or an error.
3192  */
vdo_acquire_provisional_reference(struct vdo_slab * slab,physical_block_number_t pbn,struct pbn_lock * lock)3193 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn,
3194 				      struct pbn_lock *lock)
3195 {
3196 	slab_block_number block_number;
3197 	int result;
3198 
3199 	if (vdo_pbn_lock_has_provisional_reference(lock))
3200 		return VDO_SUCCESS;
3201 
3202 	if (!is_slab_open(slab))
3203 		return VDO_INVALID_ADMIN_STATE;
3204 
3205 	result = slab_block_number_from_pbn(slab, pbn, &block_number);
3206 	if (result != VDO_SUCCESS)
3207 		return result;
3208 
3209 	if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) {
3210 		make_provisional_reference(slab, block_number);
3211 		if (lock != NULL)
3212 			vdo_assign_pbn_lock_provisional_reference(lock);
3213 	}
3214 
3215 	if (vdo_pbn_lock_has_provisional_reference(lock))
3216 		adjust_free_block_count(slab, false);
3217 
3218 	return VDO_SUCCESS;
3219 }
3220 
allocate_slab_block(struct vdo_slab * slab,physical_block_number_t * block_number_ptr)3221 static int __must_check allocate_slab_block(struct vdo_slab *slab,
3222 					    physical_block_number_t *block_number_ptr)
3223 {
3224 	slab_block_number free_index;
3225 
3226 	if (!is_slab_open(slab))
3227 		return VDO_INVALID_ADMIN_STATE;
3228 
3229 	if (!search_reference_blocks(slab, &free_index))
3230 		return VDO_NO_SPACE;
3231 
3232 	VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT),
3233 			    "free block must have ref count of zero");
3234 	make_provisional_reference(slab, free_index);
3235 	adjust_free_block_count(slab, false);
3236 
3237 	/*
3238 	 * Update the search hint so the next search will start at the array index just past the
3239 	 * free block we just found.
3240 	 */
3241 	slab->search_cursor.index = (free_index + 1);
3242 
3243 	*block_number_ptr = slab->start + free_index;
3244 	return VDO_SUCCESS;
3245 }
3246 
3247 /**
3248  * open_slab() - Prepare a slab to be allocated from.
3249  * @slab: The slab.
3250  */
open_slab(struct vdo_slab * slab)3251 static void open_slab(struct vdo_slab *slab)
3252 {
3253 	reset_search_cursor(slab);
3254 	if (is_slab_journal_blank(slab)) {
3255 		WRITE_ONCE(slab->allocator->statistics.slabs_opened,
3256 			   slab->allocator->statistics.slabs_opened + 1);
3257 		dirty_all_reference_blocks(slab);
3258 	} else {
3259 		WRITE_ONCE(slab->allocator->statistics.slabs_reopened,
3260 			   slab->allocator->statistics.slabs_reopened + 1);
3261 	}
3262 
3263 	slab->allocator->open_slab = slab;
3264 }
3265 
3266 
3267 /*
3268  * The block allocated will have a provisional reference and the reference must be either confirmed
3269  * with a subsequent increment or vacated with a subsequent decrement via
3270  * vdo_release_block_reference().
3271  */
vdo_allocate_block(struct block_allocator * allocator,physical_block_number_t * block_number_ptr)3272 int vdo_allocate_block(struct block_allocator *allocator,
3273 		       physical_block_number_t *block_number_ptr)
3274 {
3275 	int result;
3276 
3277 	if (allocator->open_slab != NULL) {
3278 		/* Try to allocate the next block in the currently open slab. */
3279 		result = allocate_slab_block(allocator->open_slab, block_number_ptr);
3280 		if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE))
3281 			return result;
3282 
3283 		/* Put the exhausted open slab back into the priority table. */
3284 		prioritize_slab(allocator->open_slab);
3285 	}
3286 
3287 	/* Remove the highest priority slab from the priority table and make it the open slab. */
3288 	open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs),
3289 			     struct vdo_slab, allocq_entry));
3290 
3291 	/*
3292 	 * Try allocating again. If we're out of space immediately after opening a slab, then every
3293 	 * slab must be fully allocated.
3294 	 */
3295 	return allocate_slab_block(allocator->open_slab, block_number_ptr);
3296 }
3297 
3298 /**
3299  * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab.
3300  * @allocator: The block_allocator on which to wait.
3301  * @waiter: The waiter.
3302  *
3303  * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and
3304  *         some other error otherwise.
3305  */
vdo_enqueue_clean_slab_waiter(struct block_allocator * allocator,struct vdo_waiter * waiter)3306 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator,
3307 				  struct vdo_waiter *waiter)
3308 {
3309 	if (vdo_is_read_only(allocator->depot->vdo))
3310 		return VDO_READ_ONLY;
3311 
3312 	if (vdo_is_state_quiescent(&allocator->scrubber.admin_state))
3313 		return VDO_NO_SPACE;
3314 
3315 	vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter);
3316 	return VDO_SUCCESS;
3317 }
3318 
3319 /**
3320  * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab
3321  *                                journal entry and then updating the reference counter.
3322  * @completion: The data_vio completion for which to add the entry.
3323  * @updater: Which of the data_vio's reference updaters is being submitted.
3324  */
vdo_modify_reference_count(struct vdo_completion * completion,struct reference_updater * updater)3325 void vdo_modify_reference_count(struct vdo_completion *completion,
3326 				struct reference_updater *updater)
3327 {
3328 	struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn);
3329 
3330 	if (!is_slab_open(slab)) {
3331 		vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE);
3332 		return;
3333 	}
3334 
3335 	if (vdo_is_read_only(completion->vdo)) {
3336 		vdo_continue_completion(completion, VDO_READ_ONLY);
3337 		return;
3338 	}
3339 
3340 	vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter);
3341 	if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal))
3342 		register_slab_for_scrubbing(slab, true);
3343 
3344 	add_entries(&slab->journal);
3345 }
3346 
3347 /* Release an unused provisional reference. */
vdo_release_block_reference(struct block_allocator * allocator,physical_block_number_t pbn)3348 int vdo_release_block_reference(struct block_allocator *allocator,
3349 				physical_block_number_t pbn)
3350 {
3351 	struct reference_updater updater;
3352 
3353 	if (pbn == VDO_ZERO_BLOCK)
3354 		return VDO_SUCCESS;
3355 
3356 	updater = (struct reference_updater) {
3357 		.operation = VDO_JOURNAL_DATA_REMAPPING,
3358 		.increment = false,
3359 		.zpbn = {
3360 			.pbn = pbn,
3361 		},
3362 	};
3363 
3364 	return adjust_reference_count(vdo_get_slab(allocator->depot, pbn),
3365 				      &updater, NULL);
3366 }
3367 
3368 /*
3369  * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as
3370  * the primary key and the 'emptiness' field as the secondary key.
3371  *
3372  * Slabs need to be pushed onto the lists in the same order they are to be popped off. Popping
3373  * should always get the most empty first, so pushing should be from most empty to least empty.
3374  * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements
3375  * before larger ones.
3376  */
slab_status_is_less_than(const void * item1,const void * item2,void __always_unused * args)3377 static bool slab_status_is_less_than(const void *item1, const void *item2,
3378 					void __always_unused *args)
3379 {
3380 	const struct slab_status *info1 = item1;
3381 	const struct slab_status *info2 = item2;
3382 
3383 	if (info1->is_clean != info2->is_clean)
3384 		return info1->is_clean;
3385 	if (info1->emptiness != info2->emptiness)
3386 		return info1->emptiness > info2->emptiness;
3387 	return info1->slab_number < info2->slab_number;
3388 }
3389 
3390 static const struct min_heap_callbacks slab_status_min_heap = {
3391 	.less = slab_status_is_less_than,
3392 	.swp = NULL,
3393 };
3394 
3395 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */
slab_action_callback(struct vdo_completion * completion)3396 static void slab_action_callback(struct vdo_completion *completion)
3397 {
3398 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3399 	struct slab_actor *actor = &allocator->slab_actor;
3400 
3401 	if (--actor->slab_action_count == 0) {
3402 		actor->callback(completion);
3403 		return;
3404 	}
3405 
3406 	vdo_reset_completion(completion);
3407 }
3408 
3409 /* Preserve the error from part of an action and continue. */
handle_operation_error(struct vdo_completion * completion)3410 static void handle_operation_error(struct vdo_completion *completion)
3411 {
3412 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3413 
3414 	if (allocator->state.waiter != NULL)
3415 		vdo_set_completion_result(allocator->state.waiter, completion->result);
3416 	completion->callback(completion);
3417 }
3418 
3419 /* Perform an action on each of an allocator's slabs in parallel. */
apply_to_slabs(struct block_allocator * allocator,vdo_action_fn callback)3420 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback)
3421 {
3422 	struct slab_iterator iterator;
3423 
3424 	vdo_prepare_completion(&allocator->completion, slab_action_callback,
3425 			       handle_operation_error, allocator->thread_id, NULL);
3426 	allocator->completion.requeue = false;
3427 
3428 	/*
3429 	 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so
3430 	 * clear it.
3431 	 */
3432 	allocator->open_slab = NULL;
3433 
3434 	/* Ensure that we don't finish before we're done starting. */
3435 	allocator->slab_actor = (struct slab_actor) {
3436 		.slab_action_count = 1,
3437 		.callback = callback,
3438 	};
3439 
3440 	iterator = get_slab_iterator(allocator);
3441 	while (iterator.next != NULL) {
3442 		const struct admin_state_code *operation =
3443 			vdo_get_admin_state_code(&allocator->state);
3444 		struct vdo_slab *slab = next_slab(&iterator);
3445 
3446 		list_del_init(&slab->allocq_entry);
3447 		allocator->slab_actor.slab_action_count++;
3448 		vdo_start_operation_with_waiter(&slab->state, operation,
3449 						&allocator->completion,
3450 						initiate_slab_action);
3451 	}
3452 
3453 	slab_action_callback(&allocator->completion);
3454 }
3455 
finish_loading_allocator(struct vdo_completion * completion)3456 static void finish_loading_allocator(struct vdo_completion *completion)
3457 {
3458 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3459 	const struct admin_state_code *operation =
3460 		vdo_get_admin_state_code(&allocator->state);
3461 
3462 	if (allocator->eraser != NULL)
3463 		dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
3464 
3465 	if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) {
3466 		void *context =
3467 			vdo_get_current_action_context(allocator->depot->action_manager);
3468 
3469 		vdo_replay_into_slab_journals(allocator, context);
3470 		return;
3471 	}
3472 
3473 	vdo_finish_loading(&allocator->state);
3474 }
3475 
3476 static void erase_next_slab_journal(struct block_allocator *allocator);
3477 
copy_callback(int read_err,unsigned long write_err,void * context)3478 static void copy_callback(int read_err, unsigned long write_err, void *context)
3479 {
3480 	struct block_allocator *allocator = context;
3481 	int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
3482 
3483 	if (result != VDO_SUCCESS) {
3484 		vdo_fail_completion(&allocator->completion, result);
3485 		return;
3486 	}
3487 
3488 	erase_next_slab_journal(allocator);
3489 }
3490 
3491 /* erase_next_slab_journal() - Erase the next slab journal. */
erase_next_slab_journal(struct block_allocator * allocator)3492 static void erase_next_slab_journal(struct block_allocator *allocator)
3493 {
3494 	struct vdo_slab *slab;
3495 	physical_block_number_t pbn;
3496 	struct dm_io_region regions[1];
3497 	struct slab_depot *depot = allocator->depot;
3498 	block_count_t blocks = depot->slab_config.slab_journal_blocks;
3499 
3500 	if (allocator->slabs_to_erase.next == NULL) {
3501 		vdo_finish_completion(&allocator->completion);
3502 		return;
3503 	}
3504 
3505 	slab = next_slab(&allocator->slabs_to_erase);
3506 	pbn = slab->journal_origin - depot->vdo->geometry.bio_offset;
3507 	regions[0] = (struct dm_io_region) {
3508 		.bdev = vdo_get_backing_device(depot->vdo),
3509 		.sector = pbn * VDO_SECTORS_PER_BLOCK,
3510 		.count = blocks * VDO_SECTORS_PER_BLOCK,
3511 	};
3512 	dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator);
3513 }
3514 
3515 /* Implements vdo_admin_initiator_fn. */
initiate_load(struct admin_state * state)3516 static void initiate_load(struct admin_state *state)
3517 {
3518 	struct block_allocator *allocator =
3519 		container_of(state, struct block_allocator, state);
3520 	const struct admin_state_code *operation = vdo_get_admin_state_code(state);
3521 
3522 	if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) {
3523 		/*
3524 		 * Must requeue because the kcopyd client cannot be freed in the same stack frame
3525 		 * as the kcopyd callback, lest it deadlock.
3526 		 */
3527 		vdo_prepare_completion_for_requeue(&allocator->completion,
3528 						   finish_loading_allocator,
3529 						   handle_operation_error,
3530 						   allocator->thread_id, NULL);
3531 		allocator->eraser = dm_kcopyd_client_create(NULL);
3532 		if (IS_ERR(allocator->eraser)) {
3533 			vdo_fail_completion(&allocator->completion,
3534 					    PTR_ERR(allocator->eraser));
3535 			allocator->eraser = NULL;
3536 			return;
3537 		}
3538 		allocator->slabs_to_erase = get_slab_iterator(allocator);
3539 
3540 		erase_next_slab_journal(allocator);
3541 		return;
3542 	}
3543 
3544 	apply_to_slabs(allocator, finish_loading_allocator);
3545 }
3546 
3547 /**
3548  * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have
3549  *                                            been recovered from the recovery journal.
3550  * @completion: The allocator completion.
3551  */
vdo_notify_slab_journals_are_recovered(struct vdo_completion * completion)3552 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion)
3553 {
3554 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
3555 
3556 	vdo_finish_loading_with_result(&allocator->state, completion->result);
3557 }
3558 
get_slab_statuses(struct block_allocator * allocator,struct slab_status ** statuses_ptr)3559 static int get_slab_statuses(struct block_allocator *allocator,
3560 			     struct slab_status **statuses_ptr)
3561 {
3562 	int result;
3563 	struct slab_status *statuses;
3564 	struct slab_iterator iterator = get_slab_iterator(allocator);
3565 
3566 	result = vdo_allocate(allocator->slab_count, struct slab_status, __func__,
3567 			      &statuses);
3568 	if (result != VDO_SUCCESS)
3569 		return result;
3570 
3571 	*statuses_ptr = statuses;
3572 
3573 	while (iterator.next != NULL)  {
3574 		slab_count_t slab_number = next_slab(&iterator)->slab_number;
3575 
3576 		*statuses++ = (struct slab_status) {
3577 			.slab_number = slab_number,
3578 			.is_clean = !allocator->summary_entries[slab_number].is_dirty,
3579 			.emptiness = allocator->summary_entries[slab_number].fullness_hint,
3580 		};
3581 	}
3582 
3583 	return VDO_SUCCESS;
3584 }
3585 
3586 /* Prepare slabs for allocation or scrubbing. */
vdo_prepare_slabs_for_allocation(struct block_allocator * allocator)3587 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator)
3588 {
3589 	struct slab_status current_slab_status;
3590 	DEFINE_MIN_HEAP(struct slab_status, heap) heap;
3591 	int result;
3592 	struct slab_status *slab_statuses;
3593 	struct slab_depot *depot = allocator->depot;
3594 
3595 	WRITE_ONCE(allocator->allocated_blocks,
3596 		   allocator->slab_count * depot->slab_config.data_blocks);
3597 	result = get_slab_statuses(allocator, &slab_statuses);
3598 	if (result != VDO_SUCCESS)
3599 		return result;
3600 
3601 	/* Sort the slabs by cleanliness, then by emptiness hint. */
3602 	heap = (struct heap) {
3603 		.data = slab_statuses,
3604 		.nr = allocator->slab_count,
3605 		.size = allocator->slab_count,
3606 	};
3607 	min_heapify_all(&heap, &slab_status_min_heap, NULL);
3608 
3609 	while (heap.nr > 0) {
3610 		bool high_priority;
3611 		struct vdo_slab *slab;
3612 		struct slab_journal *journal;
3613 
3614 		current_slab_status = slab_statuses[0];
3615 		min_heap_pop(&heap, &slab_status_min_heap, NULL);
3616 		slab = depot->slabs[current_slab_status.slab_number];
3617 
3618 		if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) ||
3619 		    (!allocator->summary_entries[slab->slab_number].load_ref_counts &&
3620 		     current_slab_status.is_clean)) {
3621 			queue_slab(slab);
3622 			continue;
3623 		}
3624 
3625 		slab->status = VDO_SLAB_REQUIRES_SCRUBBING;
3626 		journal = &slab->journal;
3627 		high_priority = ((current_slab_status.is_clean &&
3628 				 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) ||
3629 				 (journal_length(journal) >= journal->scrubbing_threshold));
3630 		register_slab_for_scrubbing(slab, high_priority);
3631 	}
3632 
3633 	vdo_free(slab_statuses);
3634 	return VDO_SUCCESS;
3635 }
3636 
status_to_string(enum slab_rebuild_status status)3637 static const char *status_to_string(enum slab_rebuild_status status)
3638 {
3639 	switch (status) {
3640 	case VDO_SLAB_REBUILT:
3641 		return "REBUILT";
3642 	case VDO_SLAB_REQUIRES_SCRUBBING:
3643 		return "SCRUBBING";
3644 	case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING:
3645 		return "PRIORITY_SCRUBBING";
3646 	case VDO_SLAB_REBUILDING:
3647 		return "REBUILDING";
3648 	case VDO_SLAB_REPLAYING:
3649 		return "REPLAYING";
3650 	default:
3651 		return "UNKNOWN";
3652 	}
3653 }
3654 
vdo_dump_block_allocator(const struct block_allocator * allocator)3655 void vdo_dump_block_allocator(const struct block_allocator *allocator)
3656 {
3657 	unsigned int pause_counter = 0;
3658 	struct slab_iterator iterator = get_slab_iterator(allocator);
3659 	const struct slab_scrubber *scrubber = &allocator->scrubber;
3660 
3661 	vdo_log_info("block_allocator zone %u", allocator->zone_number);
3662 	while (iterator.next != NULL) {
3663 		struct vdo_slab *slab = next_slab(&iterator);
3664 		struct slab_journal *journal = &slab->journal;
3665 
3666 		if (slab->reference_blocks != NULL) {
3667 			/* Terse because there are a lot of slabs to dump and syslog is lossy. */
3668 			vdo_log_info("slab %u: P%u, %llu free", slab->slab_number,
3669 				     slab->priority,
3670 				     (unsigned long long) slab->free_blocks);
3671 		} else {
3672 			vdo_log_info("slab %u: status %s", slab->slab_number,
3673 				     status_to_string(slab->status));
3674 		}
3675 
3676 		vdo_log_info("  slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s",
3677 			     vdo_waitq_num_waiters(&journal->entry_waiters),
3678 			     vdo_bool_to_string(journal->waiting_to_commit),
3679 			     vdo_bool_to_string(journal->updating_slab_summary),
3680 			     (unsigned long long) journal->head,
3681 			     (unsigned long long) journal->unreapable,
3682 			     (unsigned long long) journal->tail,
3683 			     (unsigned long long) journal->next_commit,
3684 			     (unsigned long long) journal->summarized,
3685 			     (unsigned long long) journal->last_summarized,
3686 			     (unsigned long long) journal->recovery_lock,
3687 			     vdo_bool_to_string(journal->recovery_lock != 0));
3688 		/*
3689 		 * Given the frequency with which the locks are just a tiny bit off, it might be
3690 		 * worth dumping all the locks, but that might be too much logging.
3691 		 */
3692 
3693 		if (slab->counters != NULL) {
3694 			/* Terse because there are a lot of slabs to dump and syslog is lossy. */
3695 			vdo_log_info("  slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)",
3696 				     slab->free_blocks, slab->block_count,
3697 				     slab->reference_block_count,
3698 				     vdo_waitq_num_waiters(&slab->dirty_blocks),
3699 				     slab->active_count,
3700 				     (unsigned long long) slab->slab_journal_point.sequence_number,
3701 				     slab->slab_journal_point.entry_count);
3702 		} else {
3703 			vdo_log_info("  no counters");
3704 		}
3705 
3706 		/*
3707 		 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number,
3708 		 * allowing the kernel log a chance to be flushed instead of being overrun.
3709 		 */
3710 		if (pause_counter++ == 31) {
3711 			pause_counter = 0;
3712 			vdo_pause_for_logger();
3713 		}
3714 	}
3715 
3716 	vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s",
3717 		     READ_ONCE(scrubber->slab_count),
3718 		     vdo_waitq_num_waiters(&scrubber->waiters),
3719 		     vdo_get_admin_state_code(&scrubber->admin_state)->name,
3720 		     scrubber->high_priority_only ? ", high_priority_only " : "");
3721 }
3722 
free_slab(struct vdo_slab * slab)3723 static void free_slab(struct vdo_slab *slab)
3724 {
3725 	if (slab == NULL)
3726 		return;
3727 
3728 	list_del(&slab->allocq_entry);
3729 	vdo_free(vdo_forget(slab->journal.block));
3730 	vdo_free(vdo_forget(slab->journal.locks));
3731 	vdo_free(vdo_forget(slab->counters));
3732 	vdo_free(vdo_forget(slab->reference_blocks));
3733 	vdo_free(slab);
3734 }
3735 
initialize_slab_journal(struct vdo_slab * slab)3736 static int initialize_slab_journal(struct vdo_slab *slab)
3737 {
3738 	struct slab_journal *journal = &slab->journal;
3739 	const struct slab_config *slab_config = &slab->allocator->depot->slab_config;
3740 	int result;
3741 
3742 	result = vdo_allocate(slab_config->slab_journal_blocks, struct journal_lock,
3743 			      __func__, &journal->locks);
3744 	if (result != VDO_SUCCESS)
3745 		return result;
3746 
3747 	result = vdo_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block",
3748 			      (char **) &journal->block);
3749 	if (result != VDO_SUCCESS)
3750 		return result;
3751 
3752 	journal->slab = slab;
3753 	journal->size = slab_config->slab_journal_blocks;
3754 	journal->flushing_threshold = slab_config->slab_journal_flushing_threshold;
3755 	journal->blocking_threshold = slab_config->slab_journal_blocking_threshold;
3756 	journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold;
3757 	journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK;
3758 	journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
3759 	journal->events = &slab->allocator->slab_journal_statistics;
3760 	journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal;
3761 	journal->tail = 1;
3762 	journal->head = 1;
3763 
3764 	journal->flushing_deadline = journal->flushing_threshold;
3765 	/*
3766 	 * Set there to be some time between the deadline and the blocking threshold, so that
3767 	 * hopefully all are done before blocking.
3768 	 */
3769 	if ((journal->blocking_threshold - journal->flushing_threshold) > 5)
3770 		journal->flushing_deadline = journal->blocking_threshold - 5;
3771 
3772 	journal->slab_summary_waiter.callback = release_journal_locks;
3773 
3774 	INIT_LIST_HEAD(&journal->dirty_entry);
3775 	INIT_LIST_HEAD(&journal->uncommitted_blocks);
3776 
3777 	journal->tail_header.nonce = slab->allocator->nonce;
3778 	journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL;
3779 	initialize_journal_state(journal);
3780 	return VDO_SUCCESS;
3781 }
3782 
3783 /**
3784  * make_slab() - Construct a new, empty slab.
3785  * @slab_origin: The physical block number within the block allocator partition of the first block
3786  *               in the slab.
3787  * @allocator: The block allocator to which the slab belongs.
3788  * @slab_number: The slab number of the slab.
3789  * @is_new: True if this slab is being allocated as part of a resize.
3790  * @slab_ptr: A pointer to receive the new slab.
3791  *
3792  * Return: VDO_SUCCESS or an error code.
3793  */
make_slab(physical_block_number_t slab_origin,struct block_allocator * allocator,slab_count_t slab_number,bool is_new,struct vdo_slab ** slab_ptr)3794 static int __must_check make_slab(physical_block_number_t slab_origin,
3795 				  struct block_allocator *allocator,
3796 				  slab_count_t slab_number, bool is_new,
3797 				  struct vdo_slab **slab_ptr)
3798 {
3799 	const struct slab_config *slab_config = &allocator->depot->slab_config;
3800 	struct vdo_slab *slab;
3801 	int result;
3802 
3803 	result = vdo_allocate(1, struct vdo_slab, __func__, &slab);
3804 	if (result != VDO_SUCCESS)
3805 		return result;
3806 
3807 	*slab = (struct vdo_slab) {
3808 		.allocator = allocator,
3809 		.start = slab_origin,
3810 		.end = slab_origin + slab_config->slab_blocks,
3811 		.slab_number = slab_number,
3812 		.ref_counts_origin = slab_origin + slab_config->data_blocks,
3813 		.journal_origin =
3814 			vdo_get_slab_journal_start_block(slab_config, slab_origin),
3815 		.block_count = slab_config->data_blocks,
3816 		.free_blocks = slab_config->data_blocks,
3817 		.reference_block_count =
3818 			vdo_get_saved_reference_count_size(slab_config->data_blocks),
3819 	};
3820 	INIT_LIST_HEAD(&slab->allocq_entry);
3821 
3822 	result = initialize_slab_journal(slab);
3823 	if (result != VDO_SUCCESS) {
3824 		free_slab(slab);
3825 		return result;
3826 	}
3827 
3828 	if (is_new) {
3829 		vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW);
3830 		result = allocate_slab_counters(slab);
3831 		if (result != VDO_SUCCESS) {
3832 			free_slab(slab);
3833 			return result;
3834 		}
3835 	} else {
3836 		vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3837 	}
3838 
3839 	*slab_ptr = slab;
3840 	return VDO_SUCCESS;
3841 }
3842 
3843 /**
3844  * allocate_slabs() - Allocate a new slab pointer array.
3845  * @depot: The depot.
3846  * @slab_count: The number of slabs the depot should have in the new array.
3847  *
3848  * Any existing slab pointers will be copied into the new array, and slabs will be allocated as
3849  * needed. The newly allocated slabs will not be distributed for use by the block allocators.
3850  *
3851  * Return: VDO_SUCCESS or an error code.
3852  */
allocate_slabs(struct slab_depot * depot,slab_count_t slab_count)3853 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count)
3854 {
3855 	block_count_t slab_size;
3856 	bool resizing = false;
3857 	physical_block_number_t slab_origin;
3858 	int result;
3859 
3860 	result = vdo_allocate(slab_count, struct vdo_slab *,
3861 			      "slab pointer array", &depot->new_slabs);
3862 	if (result != VDO_SUCCESS)
3863 		return result;
3864 
3865 	if (depot->slabs != NULL) {
3866 		memcpy(depot->new_slabs, depot->slabs,
3867 		       depot->slab_count * sizeof(struct vdo_slab *));
3868 		resizing = true;
3869 	}
3870 
3871 	slab_size = depot->slab_config.slab_blocks;
3872 	slab_origin = depot->first_block + (depot->slab_count * slab_size);
3873 
3874 	for (depot->new_slab_count = depot->slab_count;
3875 	     depot->new_slab_count < slab_count;
3876 	     depot->new_slab_count++, slab_origin += slab_size) {
3877 		struct block_allocator *allocator =
3878 			&depot->allocators[depot->new_slab_count % depot->zone_count];
3879 		struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count];
3880 
3881 		result = make_slab(slab_origin, allocator, depot->new_slab_count,
3882 				   resizing, slab_ptr);
3883 		if (result != VDO_SUCCESS)
3884 			return result;
3885 	}
3886 
3887 	return VDO_SUCCESS;
3888 }
3889 
3890 /**
3891  * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed.
3892  * @depot: The depot.
3893  */
vdo_abandon_new_slabs(struct slab_depot * depot)3894 void vdo_abandon_new_slabs(struct slab_depot *depot)
3895 {
3896 	slab_count_t i;
3897 
3898 	if (depot->new_slabs == NULL)
3899 		return;
3900 
3901 	for (i = depot->slab_count; i < depot->new_slab_count; i++)
3902 		free_slab(vdo_forget(depot->new_slabs[i]));
3903 	depot->new_slab_count = 0;
3904 	depot->new_size = 0;
3905 	vdo_free(vdo_forget(depot->new_slabs));
3906 }
3907 
3908 /** Implements vdo_zone_thread_getter_fn. */
get_allocator_thread_id(void * context,zone_count_t zone_number)3909 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number)
3910 {
3911 	return ((struct slab_depot *) context)->allocators[zone_number].thread_id;
3912 }
3913 
3914 /**
3915  * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock
3916  *                                   it may hold on a specified recovery journal block.
3917  * @journal: The slab journal.
3918  * @recovery_lock: The sequence number of the recovery journal block whose locks should be
3919  *                 released.
3920  *
3921  * Return: True if the journal released a lock on the specified block.
3922  */
release_recovery_journal_lock(struct slab_journal * journal,sequence_number_t recovery_lock)3923 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal,
3924 						       sequence_number_t recovery_lock)
3925 {
3926 	if (recovery_lock > journal->recovery_lock) {
3927 		VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock),
3928 				    "slab journal recovery lock is not older than the recovery journal head");
3929 		return false;
3930 	}
3931 
3932 	if ((recovery_lock < journal->recovery_lock) ||
3933 	    vdo_is_read_only(journal->slab->allocator->depot->vdo))
3934 		return false;
3935 
3936 	/* All locks are held by the block which is in progress; write it. */
3937 	commit_tail(journal);
3938 	return true;
3939 }
3940 
3941 /*
3942  * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot
3943  * is seeking to release.
3944  *
3945  * Implements vdo_zone_action_fn.
3946  */
release_tail_block_locks(void * context,zone_count_t zone_number,struct vdo_completion * parent)3947 static void release_tail_block_locks(void *context, zone_count_t zone_number,
3948 				     struct vdo_completion *parent)
3949 {
3950 	struct slab_journal *journal, *tmp;
3951 	struct slab_depot *depot = context;
3952 	struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals;
3953 
3954 	list_for_each_entry_safe(journal, tmp, list, dirty_entry) {
3955 		if (!release_recovery_journal_lock(journal,
3956 						   depot->active_release_request))
3957 			break;
3958 	}
3959 
3960 	vdo_finish_completion(parent);
3961 }
3962 
3963 /**
3964  * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks.
3965  * @context: The slab depot.
3966  * @parent: The parent operation.
3967  *
3968  * Implements vdo_action_preamble_fn.
3969  */
prepare_for_tail_block_commit(void * context,struct vdo_completion * parent)3970 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent)
3971 {
3972 	struct slab_depot *depot = context;
3973 
3974 	depot->active_release_request = depot->new_release_request;
3975 	vdo_finish_completion(parent);
3976 }
3977 
3978 /**
3979  * schedule_tail_block_commit() - Schedule a tail block commit if necessary.
3980  * @context: The slab depot.
3981  *
3982  * This method should not be called directly. Rather, call vdo_schedule_default_action() on the
3983  * depot's action manager.
3984  *
3985  * Implements vdo_action_scheduler_fn.
3986  */
schedule_tail_block_commit(void * context)3987 static bool schedule_tail_block_commit(void *context)
3988 {
3989 	struct slab_depot *depot = context;
3990 
3991 	if (depot->new_release_request == depot->active_release_request)
3992 		return false;
3993 
3994 	return vdo_schedule_action(depot->action_manager,
3995 				   prepare_for_tail_block_commit,
3996 				   release_tail_block_locks,
3997 				   NULL, NULL);
3998 }
3999 
4000 /**
4001  * initialize_slab_scrubber() - Initialize an allocator's slab scrubber.
4002  * @allocator: The allocator being initialized
4003  *
4004  * Return: VDO_SUCCESS or an error.
4005  */
initialize_slab_scrubber(struct block_allocator * allocator)4006 static int initialize_slab_scrubber(struct block_allocator *allocator)
4007 {
4008 	struct slab_scrubber *scrubber = &allocator->scrubber;
4009 	block_count_t slab_journal_size =
4010 		allocator->depot->slab_config.slab_journal_blocks;
4011 	char *journal_data;
4012 	int result;
4013 
4014 	result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size,
4015 			      char, __func__, &journal_data);
4016 	if (result != VDO_SUCCESS)
4017 		return result;
4018 
4019 	result = allocate_vio_components(allocator->completion.vdo,
4020 					 VIO_TYPE_SLAB_JOURNAL,
4021 					 VIO_PRIORITY_METADATA,
4022 					 allocator, slab_journal_size,
4023 					 journal_data, &scrubber->vio);
4024 	if (result != VDO_SUCCESS) {
4025 		vdo_free(journal_data);
4026 		return result;
4027 	}
4028 
4029 	INIT_LIST_HEAD(&scrubber->high_priority_slabs);
4030 	INIT_LIST_HEAD(&scrubber->slabs);
4031 	vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED);
4032 	return VDO_SUCCESS;
4033 }
4034 
4035 /**
4036  * initialize_slab_summary_block() - Initialize a slab_summary_block.
4037  * @allocator: The allocator which owns the block.
4038  * @index: The index of this block in its zone's summary.
4039  *
4040  * Return: VDO_SUCCESS or an error.
4041  */
initialize_slab_summary_block(struct block_allocator * allocator,block_count_t index)4042 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator,
4043 						      block_count_t index)
4044 {
4045 	struct slab_summary_block *block = &allocator->summary_blocks[index];
4046 	int result;
4047 
4048 	result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries);
4049 	if (result != VDO_SUCCESS)
4050 		return result;
4051 
4052 	result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4053 					 VIO_PRIORITY_METADATA, NULL, 1,
4054 					 block->outgoing_entries, &block->vio);
4055 	if (result != VDO_SUCCESS)
4056 		return result;
4057 
4058 	block->allocator = allocator;
4059 	block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index];
4060 	block->index = index;
4061 	return VDO_SUCCESS;
4062 }
4063 
initialize_block_allocator(struct slab_depot * depot,zone_count_t zone)4064 static int __must_check initialize_block_allocator(struct slab_depot *depot,
4065 						   zone_count_t zone)
4066 {
4067 	int result;
4068 	block_count_t i;
4069 	struct block_allocator *allocator = &depot->allocators[zone];
4070 	struct vdo *vdo = depot->vdo;
4071 	block_count_t max_free_blocks = depot->slab_config.data_blocks;
4072 	unsigned int max_priority = (2 + ilog2(max_free_blocks));
4073 	u32 reference_block_count, refcount_reads_needed, refcount_blocks_per_vio;
4074 
4075 	*allocator = (struct block_allocator) {
4076 		.depot = depot,
4077 		.zone_number = zone,
4078 		.thread_id = vdo->thread_config.physical_threads[zone],
4079 		.nonce = vdo->states.vdo.nonce,
4080 	};
4081 
4082 	INIT_LIST_HEAD(&allocator->dirty_slab_journals);
4083 	vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
4084 	result = vdo_register_read_only_listener(vdo, allocator,
4085 						 notify_block_allocator_of_read_only_mode,
4086 						 allocator->thread_id);
4087 	if (result != VDO_SUCCESS)
4088 		return result;
4089 
4090 	vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION);
4091 	result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, 1, allocator->thread_id,
4092 			       VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4093 			       allocator, &allocator->vio_pool);
4094 	if (result != VDO_SUCCESS)
4095 		return result;
4096 
4097 	/* Initialize the refcount-reading vio pool. */
4098 	reference_block_count = vdo_get_saved_reference_count_size(depot->slab_config.slab_blocks);
4099 	refcount_reads_needed = DIV_ROUND_UP(reference_block_count, MAX_BLOCKS_PER_VIO);
4100 	refcount_blocks_per_vio = DIV_ROUND_UP(reference_block_count, refcount_reads_needed);
4101 	allocator->refcount_blocks_per_big_vio = refcount_blocks_per_vio;
4102 	result = make_vio_pool(vdo, BLOCK_ALLOCATOR_REFCOUNT_VIO_POOL_SIZE,
4103 			       allocator->refcount_blocks_per_big_vio, allocator->thread_id,
4104 			       VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4105 			       NULL, &allocator->refcount_big_vio_pool);
4106 	if (result != VDO_SUCCESS)
4107 		return result;
4108 
4109 	result = initialize_slab_scrubber(allocator);
4110 	if (result != VDO_SUCCESS)
4111 		return result;
4112 
4113 	result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs);
4114 	if (result != VDO_SUCCESS)
4115 		return result;
4116 
4117 	result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE,
4118 			      struct slab_summary_block, __func__,
4119 			      &allocator->summary_blocks);
4120 	if (result != VDO_SUCCESS)
4121 		return result;
4122 
4123 	vdo_set_admin_state_code(&allocator->summary_state,
4124 				 VDO_ADMIN_STATE_NORMAL_OPERATION);
4125 	allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone);
4126 
4127 	/* Initialize each summary block. */
4128 	for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4129 		result = initialize_slab_summary_block(allocator, i);
4130 		if (result != VDO_SUCCESS)
4131 			return result;
4132 	}
4133 
4134 	/*
4135 	 * Performing well atop thin provisioned storage requires either that VDO discards freed
4136 	 * blocks, or that the block allocator try to use slabs that already have allocated blocks
4137 	 * in preference to slabs that have never been opened. For reasons we have not been able to
4138 	 * fully understand, some SSD machines have been have been very sensitive (50% reduction in
4139 	 * test throughput) to very slight differences in the timing and locality of block
4140 	 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be
4141 	 * ideal for the story, but anything less than a very high threshold (max_priority - 1)
4142 	 * hurts on these machines.
4143 	 *
4144 	 * This sets the free block threshold for preferring to open an unopened slab to the binary
4145 	 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate
4146 	 * to about half the slab size.
4147 	 */
4148 	allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4));
4149 
4150 	return VDO_SUCCESS;
4151 }
4152 
allocate_components(struct slab_depot * depot,struct partition * summary_partition)4153 static int allocate_components(struct slab_depot *depot,
4154 			       struct partition *summary_partition)
4155 {
4156 	int result;
4157 	zone_count_t zone;
4158 	slab_count_t slab_count;
4159 	u8 hint;
4160 	u32 i;
4161 	const struct thread_config *thread_config = &depot->vdo->thread_config;
4162 
4163 	result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id,
4164 					 thread_config->journal_thread, depot,
4165 					 schedule_tail_block_commit,
4166 					 depot->vdo, &depot->action_manager);
4167 	if (result != VDO_SUCCESS)
4168 		return result;
4169 
4170 	depot->origin = depot->first_block;
4171 
4172 	/* block size must be a multiple of entry size */
4173 	BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0);
4174 
4175 	depot->summary_origin = summary_partition->offset;
4176 	depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift);
4177 	result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES,
4178 			      struct slab_summary_entry, __func__,
4179 			      &depot->summary_entries);
4180 	if (result != VDO_SUCCESS)
4181 		return result;
4182 
4183 
4184 	/* Initialize all the entries. */
4185 	hint = compute_fullness_hint(depot, depot->slab_config.data_blocks);
4186 	for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) {
4187 		/*
4188 		 * This default tail block offset must be reflected in
4189 		 * slabJournal.c::read_slab_journal_tail().
4190 		 */
4191 		depot->summary_entries[i] = (struct slab_summary_entry) {
4192 			.tail_block_offset = 0,
4193 			.fullness_hint = hint,
4194 			.load_ref_counts = false,
4195 			.is_dirty = false,
4196 		};
4197 	}
4198 
4199 	slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block,
4200 					    depot->slab_size_shift);
4201 	if (thread_config->physical_zone_count > slab_count) {
4202 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
4203 					      "%u physical zones exceeds slab count %u",
4204 					      thread_config->physical_zone_count,
4205 					      slab_count);
4206 	}
4207 
4208 	/* Initialize the block allocators. */
4209 	for (zone = 0; zone < depot->zone_count; zone++) {
4210 		result = initialize_block_allocator(depot, zone);
4211 		if (result != VDO_SUCCESS)
4212 			return result;
4213 	}
4214 
4215 	/* Allocate slabs. */
4216 	result = allocate_slabs(depot, slab_count);
4217 	if (result != VDO_SUCCESS)
4218 		return result;
4219 
4220 	/* Use the new slabs. */
4221 	for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4222 		struct vdo_slab *slab = depot->new_slabs[i];
4223 
4224 		register_slab_with_allocator(slab->allocator, slab);
4225 		WRITE_ONCE(depot->slab_count, depot->slab_count + 1);
4226 	}
4227 
4228 	depot->slabs = depot->new_slabs;
4229 	depot->new_slabs = NULL;
4230 	depot->new_slab_count = 0;
4231 
4232 	return VDO_SUCCESS;
4233 }
4234 
4235 /**
4236  * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super
4237  *                           block.
4238  * @state: The slab depot state from the super block.
4239  * @vdo: The VDO which will own the depot.
4240  * @summary_partition: The partition which holds the slab summary.
4241  * @depot_ptr: A pointer to hold the depot.
4242  *
4243  * Return: A success or error code.
4244  */
vdo_decode_slab_depot(struct slab_depot_state_2_0 state,struct vdo * vdo,struct partition * summary_partition,struct slab_depot ** depot_ptr)4245 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo,
4246 			  struct partition *summary_partition,
4247 			  struct slab_depot **depot_ptr)
4248 {
4249 	unsigned int slab_size_shift;
4250 	struct slab_depot *depot;
4251 	int result;
4252 
4253 	/*
4254 	 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift
4255 	 * requires that the slab size be a power of two.
4256 	 */
4257 	block_count_t slab_size = state.slab_config.slab_blocks;
4258 
4259 	if (!is_power_of_2(slab_size)) {
4260 		return vdo_log_error_strerror(UDS_INVALID_ARGUMENT,
4261 					      "slab size must be a power of two");
4262 	}
4263 	slab_size_shift = ilog2(slab_size);
4264 
4265 	result = vdo_allocate_extended(struct slab_depot,
4266 				       vdo->thread_config.physical_zone_count,
4267 				       struct block_allocator, __func__, &depot);
4268 	if (result != VDO_SUCCESS)
4269 		return result;
4270 
4271 	depot->vdo = vdo;
4272 	depot->old_zone_count = state.zone_count;
4273 	depot->zone_count = vdo->thread_config.physical_zone_count;
4274 	depot->slab_config = state.slab_config;
4275 	depot->first_block = state.first_block;
4276 	depot->last_block = state.last_block;
4277 	depot->slab_size_shift = slab_size_shift;
4278 
4279 	result = allocate_components(depot, summary_partition);
4280 	if (result != VDO_SUCCESS) {
4281 		vdo_free_slab_depot(depot);
4282 		return result;
4283 	}
4284 
4285 	*depot_ptr = depot;
4286 	return VDO_SUCCESS;
4287 }
4288 
uninitialize_allocator_summary(struct block_allocator * allocator)4289 static void uninitialize_allocator_summary(struct block_allocator *allocator)
4290 {
4291 	block_count_t i;
4292 
4293 	if (allocator->summary_blocks == NULL)
4294 		return;
4295 
4296 	for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4297 		free_vio_components(&allocator->summary_blocks[i].vio);
4298 		vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries));
4299 	}
4300 
4301 	vdo_free(vdo_forget(allocator->summary_blocks));
4302 }
4303 
4304 /**
4305  * vdo_free_slab_depot() - Destroy a slab depot.
4306  * @depot: The depot to destroy.
4307  */
vdo_free_slab_depot(struct slab_depot * depot)4308 void vdo_free_slab_depot(struct slab_depot *depot)
4309 {
4310 	zone_count_t zone = 0;
4311 
4312 	if (depot == NULL)
4313 		return;
4314 
4315 	vdo_abandon_new_slabs(depot);
4316 
4317 	for (zone = 0; zone < depot->zone_count; zone++) {
4318 		struct block_allocator *allocator = &depot->allocators[zone];
4319 
4320 		if (allocator->eraser != NULL)
4321 			dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
4322 
4323 		uninitialize_allocator_summary(allocator);
4324 		uninitialize_scrubber_vio(&allocator->scrubber);
4325 		free_vio_pool(vdo_forget(allocator->vio_pool));
4326 		free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool));
4327 		vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs));
4328 	}
4329 
4330 	if (depot->slabs != NULL) {
4331 		slab_count_t i;
4332 
4333 		for (i = 0; i < depot->slab_count; i++)
4334 			free_slab(vdo_forget(depot->slabs[i]));
4335 	}
4336 
4337 	vdo_free(vdo_forget(depot->slabs));
4338 	vdo_free(vdo_forget(depot->action_manager));
4339 	vdo_free(vdo_forget(depot->summary_entries));
4340 	vdo_free(depot);
4341 }
4342 
4343 /**
4344  * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block.
4345  * @depot: The depot to encode.
4346  *
4347  * Return: The depot state.
4348  */
vdo_record_slab_depot(const struct slab_depot * depot)4349 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot)
4350 {
4351 	/*
4352 	 * If this depot is currently using 0 zones, it must have been synchronously loaded by a
4353 	 * tool and is now being saved. We did not load and combine the slab summary, so we still
4354 	 * need to do that next time we load with the old zone count rather than 0.
4355 	 */
4356 	struct slab_depot_state_2_0 state;
4357 	zone_count_t zones_to_record = depot->zone_count;
4358 
4359 	if (depot->zone_count == 0)
4360 		zones_to_record = depot->old_zone_count;
4361 
4362 	state = (struct slab_depot_state_2_0) {
4363 		.slab_config = depot->slab_config,
4364 		.first_block = depot->first_block,
4365 		.last_block = depot->last_block,
4366 		.zone_count = zones_to_record,
4367 	};
4368 
4369 	return state;
4370 }
4371 
4372 /**
4373  * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot.
4374  * @depot: The slab depot.
4375  *
4376  * Context: This method may be called only before entering normal operation from the load thread.
4377  *
4378  * Return: VDO_SUCCESS or an error.
4379  */
vdo_allocate_reference_counters(struct slab_depot * depot)4380 int vdo_allocate_reference_counters(struct slab_depot *depot)
4381 {
4382 	struct slab_iterator iterator =
4383 		get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1);
4384 
4385 	while (iterator.next != NULL) {
4386 		int result = allocate_slab_counters(next_slab(&iterator));
4387 
4388 		if (result != VDO_SUCCESS)
4389 			return result;
4390 	}
4391 
4392 	return VDO_SUCCESS;
4393 }
4394 
4395 /**
4396  * get_slab_number() - Get the number of the slab that contains a specified block.
4397  * @depot: The slab depot.
4398  * @pbn: The physical block number.
4399  * @slab_number_ptr: A pointer to hold the slab number.
4400  *
4401  * Return: VDO_SUCCESS or an error.
4402  */
get_slab_number(const struct slab_depot * depot,physical_block_number_t pbn,slab_count_t * slab_number_ptr)4403 static int __must_check get_slab_number(const struct slab_depot *depot,
4404 					physical_block_number_t pbn,
4405 					slab_count_t *slab_number_ptr)
4406 {
4407 	slab_count_t slab_number;
4408 
4409 	if (pbn < depot->first_block)
4410 		return VDO_OUT_OF_RANGE;
4411 
4412 	slab_number = (pbn - depot->first_block) >> depot->slab_size_shift;
4413 	if (slab_number >= depot->slab_count)
4414 		return VDO_OUT_OF_RANGE;
4415 
4416 	*slab_number_ptr = slab_number;
4417 	return VDO_SUCCESS;
4418 }
4419 
4420 /**
4421  * vdo_get_slab() - Get the slab object for the slab that contains a specified block.
4422  * @depot: The slab depot.
4423  * @pbn: The physical block number.
4424  *
4425  * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block.
4426  *
4427  * Return: The slab containing the block, or NULL if the block number is the zero block or
4428  * otherwise out of range.
4429  */
vdo_get_slab(const struct slab_depot * depot,physical_block_number_t pbn)4430 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot,
4431 			      physical_block_number_t pbn)
4432 {
4433 	slab_count_t slab_number;
4434 	int result;
4435 
4436 	if (pbn == VDO_ZERO_BLOCK)
4437 		return NULL;
4438 
4439 	result = get_slab_number(depot, pbn, &slab_number);
4440 	if (result != VDO_SUCCESS) {
4441 		vdo_enter_read_only_mode(depot->vdo, result);
4442 		return NULL;
4443 	}
4444 
4445 	return depot->slabs[slab_number];
4446 }
4447 
4448 /**
4449  * vdo_get_increment_limit() - Determine how many new references a block can acquire.
4450  * @depot: The slab depot.
4451  * @pbn: The physical block number that is being queried.
4452  *
4453  * Context: This method must be called from the physical zone thread of the PBN.
4454  *
4455  * Return: The number of available references.
4456  */
vdo_get_increment_limit(struct slab_depot * depot,physical_block_number_t pbn)4457 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn)
4458 {
4459 	struct vdo_slab *slab = vdo_get_slab(depot, pbn);
4460 	vdo_refcount_t *counter_ptr = NULL;
4461 	int result;
4462 
4463 	if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT))
4464 		return 0;
4465 
4466 	result = get_reference_counter(slab, pbn, &counter_ptr);
4467 	if (result != VDO_SUCCESS)
4468 		return 0;
4469 
4470 	if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT)
4471 		return (MAXIMUM_REFERENCE_COUNT - 1);
4472 
4473 	return (MAXIMUM_REFERENCE_COUNT - *counter_ptr);
4474 }
4475 
4476 /**
4477  * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block.
4478  * @depot: The depot.
4479  * @pbn: The physical block number to ask about.
4480  *
4481  * Return: True if the PBN corresponds to a data block.
4482  */
vdo_is_physical_data_block(const struct slab_depot * depot,physical_block_number_t pbn)4483 bool vdo_is_physical_data_block(const struct slab_depot *depot,
4484 				physical_block_number_t pbn)
4485 {
4486 	slab_count_t slab_number;
4487 	slab_block_number sbn;
4488 
4489 	return ((pbn == VDO_ZERO_BLOCK) ||
4490 		((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) &&
4491 		 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) ==
4492 		  VDO_SUCCESS)));
4493 }
4494 
4495 /**
4496  * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all
4497  * the slabs in the depot.
4498  * @depot: The slab depot.
4499  *
4500  * This is the total number of blocks with a non-zero reference count.
4501  *
4502  * Context: This may be called from any thread.
4503  *
4504  * Return: The total number of blocks with a non-zero reference count.
4505  */
vdo_get_slab_depot_allocated_blocks(const struct slab_depot * depot)4506 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot)
4507 {
4508 	block_count_t total = 0;
4509 	zone_count_t zone;
4510 
4511 	for (zone = 0; zone < depot->zone_count; zone++) {
4512 		/* The allocators are responsible for thread safety. */
4513 		total += READ_ONCE(depot->allocators[zone].allocated_blocks);
4514 	}
4515 
4516 	return total;
4517 }
4518 
4519 /**
4520  * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the
4521  *                                    depot.
4522  * @depot: The slab depot.
4523  *
4524  * Context: This may be called from any thread.
4525  *
4526  * Return: The total number of data blocks in all slabs.
4527  */
vdo_get_slab_depot_data_blocks(const struct slab_depot * depot)4528 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot)
4529 {
4530 	return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks);
4531 }
4532 
4533 /**
4534  * finish_combining_zones() - Clean up after saving out the combined slab summary.
4535  * @completion: The vio which was used to write the summary data.
4536  */
finish_combining_zones(struct vdo_completion * completion)4537 static void finish_combining_zones(struct vdo_completion *completion)
4538 {
4539 	int result = completion->result;
4540 	struct vdo_completion *parent = completion->parent;
4541 
4542 	free_vio(as_vio(vdo_forget(completion)));
4543 	vdo_fail_completion(parent, result);
4544 }
4545 
handle_combining_error(struct vdo_completion * completion)4546 static void handle_combining_error(struct vdo_completion *completion)
4547 {
4548 	vio_record_metadata_io_error(as_vio(completion));
4549 	finish_combining_zones(completion);
4550 }
4551 
write_summary_endio(struct bio * bio)4552 static void write_summary_endio(struct bio *bio)
4553 {
4554 	struct vio *vio = bio->bi_private;
4555 	struct vdo *vdo = vio->completion.vdo;
4556 
4557 	continue_vio_after_io(vio, finish_combining_zones,
4558 			      vdo->thread_config.admin_thread);
4559 }
4560 
4561 /**
4562  * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones,
4563  *                       update every zone to the correct values for every slab.
4564  * @depot: The depot whose summary entries should be combined.
4565  */
combine_summaries(struct slab_depot * depot)4566 static void combine_summaries(struct slab_depot *depot)
4567 {
4568 	/*
4569 	 * Combine all the old summary data into the portion of the buffer corresponding to the
4570 	 * first zone.
4571 	 */
4572 	zone_count_t zone = 0;
4573 	struct slab_summary_entry *entries = depot->summary_entries;
4574 
4575 	if (depot->old_zone_count > 1) {
4576 		slab_count_t entry_number;
4577 
4578 		for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) {
4579 			if (zone != 0) {
4580 				memcpy(entries + entry_number,
4581 				       entries + (zone * MAX_VDO_SLABS) + entry_number,
4582 				       sizeof(struct slab_summary_entry));
4583 			}
4584 
4585 			zone++;
4586 			if (zone == depot->old_zone_count)
4587 				zone = 0;
4588 		}
4589 	}
4590 
4591 	/* Copy the combined data to each zones's region of the buffer. */
4592 	for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) {
4593 		memcpy(entries + (zone * MAX_VDO_SLABS), entries,
4594 		       MAX_VDO_SLABS * sizeof(struct slab_summary_entry));
4595 	}
4596 }
4597 
4598 /**
4599  * finish_loading_summary() - Finish loading slab summary data.
4600  * @completion: The vio which was used to read the summary data.
4601  *
4602  * Combines the slab summary data from all the previously written zones and copies the combined
4603  * summary to each partition's data region. Then writes the combined summary back out to disk. This
4604  * callback is registered in load_summary_endio().
4605  */
finish_loading_summary(struct vdo_completion * completion)4606 static void finish_loading_summary(struct vdo_completion *completion)
4607 {
4608 	struct slab_depot *depot = completion->vdo->depot;
4609 
4610 	/* Combine the summary from each zone so each zone is correct for all slabs. */
4611 	combine_summaries(depot);
4612 
4613 	/* Write the combined summary back out. */
4614 	vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin,
4615 				write_summary_endio, handle_combining_error,
4616 				REQ_OP_WRITE);
4617 }
4618 
load_summary_endio(struct bio * bio)4619 static void load_summary_endio(struct bio *bio)
4620 {
4621 	struct vio *vio = bio->bi_private;
4622 	struct vdo *vdo = vio->completion.vdo;
4623 
4624 	continue_vio_after_io(vio, finish_loading_summary,
4625 			      vdo->thread_config.admin_thread);
4626 }
4627 
4628 /**
4629  * load_slab_summary() - Load the slab summary before the slab data.
4630  * @context: The slab depot.
4631  * @parent: The load operation.
4632  *
4633  * Implements vdo_action_preamble_fn.
4634  */
load_slab_summary(void * context,struct vdo_completion * parent)4635 static void load_slab_summary(void *context, struct vdo_completion *parent)
4636 {
4637 	int result;
4638 	struct vio *vio;
4639 	struct slab_depot *depot = context;
4640 	const struct admin_state_code *operation =
4641 		vdo_get_current_manager_operation(depot->action_manager);
4642 
4643 	result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4644 						 VIO_PRIORITY_METADATA, parent,
4645 						 VDO_SLAB_SUMMARY_BLOCKS,
4646 						 (char *) depot->summary_entries, &vio);
4647 	if (result != VDO_SUCCESS) {
4648 		vdo_fail_completion(parent, result);
4649 		return;
4650 	}
4651 
4652 	if ((operation == VDO_ADMIN_STATE_FORMATTING) ||
4653 	    (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) {
4654 		finish_loading_summary(&vio->completion);
4655 		return;
4656 	}
4657 
4658 	vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio,
4659 				handle_combining_error, REQ_OP_READ);
4660 }
4661 
4662 /* Implements vdo_zone_action_fn. */
load_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4663 static void load_allocator(void *context, zone_count_t zone_number,
4664 			   struct vdo_completion *parent)
4665 {
4666 	struct slab_depot *depot = context;
4667 
4668 	vdo_start_loading(&depot->allocators[zone_number].state,
4669 			  vdo_get_current_manager_operation(depot->action_manager),
4670 			  parent, initiate_load);
4671 }
4672 
4673 /**
4674  * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the
4675  *                         super_block component.
4676  * @depot: The depot to load.
4677  * @operation: The type of load to perform.
4678  * @parent: The completion to notify when the load is complete.
4679  * @context: Additional context for the load operation; may be NULL.
4680  *
4681  * This method may be called only before entering normal operation from the load thread.
4682  */
vdo_load_slab_depot(struct slab_depot * depot,const struct admin_state_code * operation,struct vdo_completion * parent,void * context)4683 void vdo_load_slab_depot(struct slab_depot *depot,
4684 			 const struct admin_state_code *operation,
4685 			 struct vdo_completion *parent, void *context)
4686 {
4687 	if (!vdo_assert_load_operation(operation, parent))
4688 		return;
4689 
4690 	vdo_schedule_operation_with_context(depot->action_manager, operation,
4691 					    load_slab_summary, load_allocator,
4692 					    NULL, context, parent);
4693 }
4694 
4695 /* Implements vdo_zone_action_fn. */
prepare_to_allocate(void * context,zone_count_t zone_number,struct vdo_completion * parent)4696 static void prepare_to_allocate(void *context, zone_count_t zone_number,
4697 				struct vdo_completion *parent)
4698 {
4699 	struct slab_depot *depot = context;
4700 	struct block_allocator *allocator = &depot->allocators[zone_number];
4701 	int result;
4702 
4703 	result = vdo_prepare_slabs_for_allocation(allocator);
4704 	if (result != VDO_SUCCESS) {
4705 		vdo_fail_completion(parent, result);
4706 		return;
4707 	}
4708 
4709 	scrub_slabs(allocator, parent);
4710 }
4711 
4712 /**
4713  * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start
4714  *                                        allocating blocks.
4715  * @depot: The depot to prepare.
4716  * @load_type: The load type.
4717  * @parent: The completion to notify when the operation is complete.
4718  *
4719  * This method may be called only before entering normal operation from the load thread. It must be
4720  * called before allocation may proceed.
4721  */
vdo_prepare_slab_depot_to_allocate(struct slab_depot * depot,enum slab_depot_load_type load_type,struct vdo_completion * parent)4722 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot,
4723 					enum slab_depot_load_type load_type,
4724 					struct vdo_completion *parent)
4725 {
4726 	depot->load_type = load_type;
4727 	atomic_set(&depot->zones_to_scrub, depot->zone_count);
4728 	vdo_schedule_action(depot->action_manager, NULL,
4729 			    prepare_to_allocate, NULL, parent);
4730 }
4731 
4732 /**
4733  * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory.
4734  * @depot: The depot to update.
4735  *
4736  * This size is saved to disk as part of the super block.
4737  */
vdo_update_slab_depot_size(struct slab_depot * depot)4738 void vdo_update_slab_depot_size(struct slab_depot *depot)
4739 {
4740 	depot->last_block = depot->new_last_block;
4741 }
4742 
4743 /**
4744  * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to
4745  *                                    the given size.
4746  * @depot: The depot to prepare to resize.
4747  * @partition: The new depot partition.
4748  *
4749  * Return: VDO_SUCCESS or an error.
4750  */
vdo_prepare_to_grow_slab_depot(struct slab_depot * depot,const struct partition * partition)4751 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot,
4752 				   const struct partition *partition)
4753 {
4754 	struct slab_depot_state_2_0 new_state;
4755 	int result;
4756 	slab_count_t new_slab_count;
4757 
4758 	if ((partition->count >> depot->slab_size_shift) <= depot->slab_count)
4759 		return VDO_INCREMENT_TOO_SMALL;
4760 
4761 	/* Generate the depot configuration for the new block count. */
4762 	VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset,
4763 			    "New slab depot partition doesn't change origin");
4764 	result = vdo_configure_slab_depot(partition, depot->slab_config,
4765 					  depot->zone_count, &new_state);
4766 	if (result != VDO_SUCCESS)
4767 		return result;
4768 
4769 	new_slab_count = vdo_compute_slab_count(depot->first_block,
4770 						new_state.last_block,
4771 						depot->slab_size_shift);
4772 	if (new_slab_count <= depot->slab_count)
4773 		return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL,
4774 					      "Depot can only grow");
4775 	if (new_slab_count == depot->new_slab_count) {
4776 		/* Check it out, we've already got all the new slabs allocated! */
4777 		return VDO_SUCCESS;
4778 	}
4779 
4780 	vdo_abandon_new_slabs(depot);
4781 	result = allocate_slabs(depot, new_slab_count);
4782 	if (result != VDO_SUCCESS) {
4783 		vdo_abandon_new_slabs(depot);
4784 		return result;
4785 	}
4786 
4787 	depot->new_size = partition->count;
4788 	depot->old_last_block = depot->last_block;
4789 	depot->new_last_block = new_state.last_block;
4790 
4791 	return VDO_SUCCESS;
4792 }
4793 
4794 /**
4795  * finish_registration() - Finish registering new slabs now that all of the allocators have
4796  *                         received their new slabs.
4797  * @context: The slab depot.
4798  *
4799  * Implements vdo_action_conclusion_fn.
4800  */
finish_registration(void * context)4801 static int finish_registration(void *context)
4802 {
4803 	struct slab_depot *depot = context;
4804 
4805 	WRITE_ONCE(depot->slab_count, depot->new_slab_count);
4806 	vdo_free(depot->slabs);
4807 	depot->slabs = depot->new_slabs;
4808 	depot->new_slabs = NULL;
4809 	depot->new_slab_count = 0;
4810 	return VDO_SUCCESS;
4811 }
4812 
4813 /* Implements vdo_zone_action_fn. */
register_new_slabs(void * context,zone_count_t zone_number,struct vdo_completion * parent)4814 static void register_new_slabs(void *context, zone_count_t zone_number,
4815 			       struct vdo_completion *parent)
4816 {
4817 	struct slab_depot *depot = context;
4818 	struct block_allocator *allocator = &depot->allocators[zone_number];
4819 	slab_count_t i;
4820 
4821 	for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4822 		struct vdo_slab *slab = depot->new_slabs[i];
4823 
4824 		if (slab->allocator == allocator)
4825 			register_slab_with_allocator(allocator, slab);
4826 	}
4827 
4828 	vdo_finish_completion(parent);
4829 }
4830 
4831 /**
4832  * vdo_use_new_slabs() - Use the new slabs allocated for resize.
4833  * @depot: The depot.
4834  * @parent: The object to notify when complete.
4835  */
vdo_use_new_slabs(struct slab_depot * depot,struct vdo_completion * parent)4836 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent)
4837 {
4838 	VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use");
4839 	vdo_schedule_operation(depot->action_manager,
4840 			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
4841 			       NULL, register_new_slabs,
4842 			       finish_registration, parent);
4843 }
4844 
4845 /**
4846  * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is
4847  *                    currently working on.
4848  * @allocator: The block allocator owning the scrubber to stop.
4849  */
stop_scrubbing(struct block_allocator * allocator)4850 static void stop_scrubbing(struct block_allocator *allocator)
4851 {
4852 	struct slab_scrubber *scrubber = &allocator->scrubber;
4853 
4854 	if (vdo_is_state_quiescent(&scrubber->admin_state)) {
4855 		vdo_finish_completion(&allocator->completion);
4856 	} else {
4857 		vdo_start_draining(&scrubber->admin_state,
4858 				   VDO_ADMIN_STATE_SUSPENDING,
4859 				   &allocator->completion, NULL);
4860 	}
4861 }
4862 
4863 /* Implements vdo_admin_initiator_fn. */
initiate_summary_drain(struct admin_state * state)4864 static void initiate_summary_drain(struct admin_state *state)
4865 {
4866 	check_summary_drain_complete(container_of(state, struct block_allocator,
4867 						  summary_state));
4868 }
4869 
do_drain_step(struct vdo_completion * completion)4870 static void do_drain_step(struct vdo_completion *completion)
4871 {
4872 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
4873 
4874 	vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step,
4875 					   handle_operation_error, allocator->thread_id,
4876 					   NULL);
4877 	switch (++allocator->drain_step) {
4878 	case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4879 		stop_scrubbing(allocator);
4880 		return;
4881 
4882 	case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4883 		apply_to_slabs(allocator, do_drain_step);
4884 		return;
4885 
4886 	case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4887 		vdo_start_draining(&allocator->summary_state,
4888 				   vdo_get_admin_state_code(&allocator->state),
4889 				   completion, initiate_summary_drain);
4890 		return;
4891 
4892 	case VDO_DRAIN_ALLOCATOR_STEP_FINISHED:
4893 		VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool),
4894 				    "vio pool not busy");
4895 		vdo_finish_draining_with_result(&allocator->state, completion->result);
4896 		return;
4897 
4898 	default:
4899 		vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE);
4900 	}
4901 }
4902 
4903 /* Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)4904 static void initiate_drain(struct admin_state *state)
4905 {
4906 	struct block_allocator *allocator =
4907 		container_of(state, struct block_allocator, state);
4908 
4909 	allocator->drain_step = VDO_DRAIN_ALLOCATOR_START;
4910 	do_drain_step(&allocator->completion);
4911 }
4912 
4913 /*
4914  * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be
4915  * written to disk. The type of drain will be determined from the state of the allocator's depot.
4916  *
4917  * Implements vdo_zone_action_fn.
4918  */
drain_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4919 static void drain_allocator(void *context, zone_count_t zone_number,
4920 			    struct vdo_completion *parent)
4921 {
4922 	struct slab_depot *depot = context;
4923 
4924 	vdo_start_draining(&depot->allocators[zone_number].state,
4925 			   vdo_get_current_manager_operation(depot->action_manager),
4926 			   parent, initiate_drain);
4927 }
4928 
4929 /**
4930  * vdo_drain_slab_depot() - Drain all slab depot I/O.
4931  * @depot: The depot to drain.
4932  * @operation: The drain operation (flush, rebuild, suspend, or save).
4933  * @parent: The completion to finish when the drain is complete.
4934  *
4935  * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending,
4936  * the depot will be left in a suspended state.
4937  */
vdo_drain_slab_depot(struct slab_depot * depot,const struct admin_state_code * operation,struct vdo_completion * parent)4938 void vdo_drain_slab_depot(struct slab_depot *depot,
4939 			  const struct admin_state_code *operation,
4940 			  struct vdo_completion *parent)
4941 {
4942 	vdo_schedule_operation(depot->action_manager, operation,
4943 			       NULL, drain_allocator, NULL, parent);
4944 }
4945 
4946 /**
4947  * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped.
4948  * @allocator: The allocator being resumed.
4949  */
resume_scrubbing(struct block_allocator * allocator)4950 static void resume_scrubbing(struct block_allocator *allocator)
4951 {
4952 	int result;
4953 	struct slab_scrubber *scrubber = &allocator->scrubber;
4954 
4955 	if (!has_slabs_to_scrub(scrubber)) {
4956 		vdo_finish_completion(&allocator->completion);
4957 		return;
4958 	}
4959 
4960 	result = vdo_resume_if_quiescent(&scrubber->admin_state);
4961 	if (result != VDO_SUCCESS) {
4962 		vdo_fail_completion(&allocator->completion, result);
4963 		return;
4964 	}
4965 
4966 	scrub_next_slab(scrubber);
4967 	vdo_finish_completion(&allocator->completion);
4968 }
4969 
do_resume_step(struct vdo_completion * completion)4970 static void do_resume_step(struct vdo_completion *completion)
4971 {
4972 	struct block_allocator *allocator = vdo_as_block_allocator(completion);
4973 
4974 	vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step,
4975 					   handle_operation_error,
4976 					   allocator->thread_id, NULL);
4977 	switch (--allocator->drain_step) {
4978 	case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4979 		vdo_fail_completion(completion,
4980 				    vdo_resume_if_quiescent(&allocator->summary_state));
4981 		return;
4982 
4983 	case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4984 		apply_to_slabs(allocator, do_resume_step);
4985 		return;
4986 
4987 	case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4988 		resume_scrubbing(allocator);
4989 		return;
4990 
4991 	case VDO_DRAIN_ALLOCATOR_START:
4992 		vdo_finish_resuming_with_result(&allocator->state, completion->result);
4993 		return;
4994 
4995 	default:
4996 		vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE);
4997 	}
4998 }
4999 
5000 /* Implements vdo_admin_initiator_fn. */
initiate_resume(struct admin_state * state)5001 static void initiate_resume(struct admin_state *state)
5002 {
5003 	struct block_allocator *allocator =
5004 		container_of(state, struct block_allocator, state);
5005 
5006 	allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED;
5007 	do_resume_step(&allocator->completion);
5008 }
5009 
5010 /* Implements vdo_zone_action_fn. */
resume_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)5011 static void resume_allocator(void *context, zone_count_t zone_number,
5012 			     struct vdo_completion *parent)
5013 {
5014 	struct slab_depot *depot = context;
5015 
5016 	vdo_start_resuming(&depot->allocators[zone_number].state,
5017 			   vdo_get_current_manager_operation(depot->action_manager),
5018 			   parent, initiate_resume);
5019 }
5020 
5021 /**
5022  * vdo_resume_slab_depot() - Resume a suspended slab depot.
5023  * @depot: The depot to resume.
5024  * @parent: The completion to finish when the depot has resumed.
5025  */
vdo_resume_slab_depot(struct slab_depot * depot,struct vdo_completion * parent)5026 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent)
5027 {
5028 	if (vdo_is_read_only(depot->vdo)) {
5029 		vdo_continue_completion(parent, VDO_READ_ONLY);
5030 		return;
5031 	}
5032 
5033 	vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING,
5034 			       NULL, resume_allocator, NULL, parent);
5035 }
5036 
5037 /**
5038  * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a
5039  *                                                given recovery journal block.
5040  * @depot: The depot.
5041  * @recovery_block_number: The sequence number of the recovery journal block whose locks should be
5042  *                         released.
5043  *
5044  * Context: This method must be called from the journal zone thread.
5045  */
vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot * depot,sequence_number_t recovery_block_number)5046 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot,
5047 						sequence_number_t recovery_block_number)
5048 {
5049 	if (depot == NULL)
5050 		return;
5051 
5052 	depot->new_release_request = recovery_block_number;
5053 	vdo_schedule_default_action(depot->action_manager);
5054 }
5055 
5056 /* Implements vdo_zone_action_fn. */
scrub_all_unrecovered_slabs(void * context,zone_count_t zone_number,struct vdo_completion * parent)5057 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number,
5058 					struct vdo_completion *parent)
5059 {
5060 	struct slab_depot *depot = context;
5061 
5062 	scrub_slabs(&depot->allocators[zone_number], NULL);
5063 	vdo_launch_completion(parent);
5064 }
5065 
5066 /**
5067  * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs.
5068  * @depot: The depot to scrub.
5069  * @parent: The object to notify when scrubbing has been launched for all zones.
5070  */
vdo_scrub_all_unrecovered_slabs(struct slab_depot * depot,struct vdo_completion * parent)5071 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot,
5072 				     struct vdo_completion *parent)
5073 {
5074 	vdo_schedule_action(depot->action_manager, NULL,
5075 			    scrub_all_unrecovered_slabs,
5076 			    NULL, parent);
5077 }
5078 
5079 /**
5080  * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators
5081  *                                    in the depot.
5082  * @depot: The slab depot.
5083  *
5084  * Return: The statistics from all block allocators in the depot.
5085  */
5086 static struct block_allocator_statistics __must_check
get_block_allocator_statistics(const struct slab_depot * depot)5087 get_block_allocator_statistics(const struct slab_depot *depot)
5088 {
5089 	struct block_allocator_statistics totals;
5090 	zone_count_t zone;
5091 
5092 	memset(&totals, 0, sizeof(totals));
5093 
5094 	for (zone = 0; zone < depot->zone_count; zone++) {
5095 		const struct block_allocator *allocator = &depot->allocators[zone];
5096 		const struct block_allocator_statistics *stats = &allocator->statistics;
5097 
5098 		totals.slab_count += allocator->slab_count;
5099 		totals.slabs_opened += READ_ONCE(stats->slabs_opened);
5100 		totals.slabs_reopened += READ_ONCE(stats->slabs_reopened);
5101 	}
5102 
5103 	return totals;
5104 }
5105 
5106 /**
5107  * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot.
5108  * @depot: The slab depot.
5109  *
5110  * Return: The cumulative statistics for all ref_counts in the depot.
5111  */
5112 static struct ref_counts_statistics __must_check
get_ref_counts_statistics(const struct slab_depot * depot)5113 get_ref_counts_statistics(const struct slab_depot *depot)
5114 {
5115 	struct ref_counts_statistics totals;
5116 	zone_count_t zone;
5117 
5118 	memset(&totals, 0, sizeof(totals));
5119 
5120 	for (zone = 0; zone < depot->zone_count; zone++) {
5121 		totals.blocks_written +=
5122 			READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written);
5123 	}
5124 
5125 	return totals;
5126 }
5127 
5128 /**
5129  * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot.
5130  * @depot: The slab depot.
5131  *
5132  * Return: The aggregated statistics for all slab journals in the depot.
5133  */
5134 static struct slab_journal_statistics __must_check
get_slab_journal_statistics(const struct slab_depot * depot)5135 get_slab_journal_statistics(const struct slab_depot *depot)
5136 {
5137 	struct slab_journal_statistics totals;
5138 	zone_count_t zone;
5139 
5140 	memset(&totals, 0, sizeof(totals));
5141 
5142 	for (zone = 0; zone < depot->zone_count; zone++) {
5143 		const struct slab_journal_statistics *stats =
5144 			&depot->allocators[zone].slab_journal_statistics;
5145 
5146 		totals.disk_full_count += READ_ONCE(stats->disk_full_count);
5147 		totals.flush_count += READ_ONCE(stats->flush_count);
5148 		totals.blocked_count += READ_ONCE(stats->blocked_count);
5149 		totals.blocks_written += READ_ONCE(stats->blocks_written);
5150 		totals.tail_busy_count += READ_ONCE(stats->tail_busy_count);
5151 	}
5152 
5153 	return totals;
5154 }
5155 
5156 /**
5157  * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the
5158  *                                   slab depot.
5159  * @depot: The slab depot.
5160  * @stats: The vdo statistics structure to partially fill.
5161  */
vdo_get_slab_depot_statistics(const struct slab_depot * depot,struct vdo_statistics * stats)5162 void vdo_get_slab_depot_statistics(const struct slab_depot *depot,
5163 				   struct vdo_statistics *stats)
5164 {
5165 	slab_count_t slab_count = READ_ONCE(depot->slab_count);
5166 	slab_count_t unrecovered = 0;
5167 	zone_count_t zone;
5168 
5169 	for (zone = 0; zone < depot->zone_count; zone++) {
5170 		/* The allocators are responsible for thread safety. */
5171 		unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count);
5172 	}
5173 
5174 	stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count;
5175 	stats->allocator = get_block_allocator_statistics(depot);
5176 	stats->ref_counts = get_ref_counts_statistics(depot);
5177 	stats->slab_journal = get_slab_journal_statistics(depot);
5178 	stats->slab_summary = (struct slab_summary_statistics) {
5179 		.blocks_written = atomic64_read(&depot->summary_statistics.blocks_written),
5180 	};
5181 }
5182 
5183 /**
5184  * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion.
5185  * @depot: The slab depot.
5186  */
vdo_dump_slab_depot(const struct slab_depot * depot)5187 void vdo_dump_slab_depot(const struct slab_depot *depot)
5188 {
5189 	vdo_log_info("vdo slab depot");
5190 	vdo_log_info("  zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu",
5191 		     (unsigned int) depot->zone_count,
5192 		     (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count),
5193 		     (unsigned long long) depot->active_release_request,
5194 		     (unsigned long long) depot->new_release_request);
5195 }
5196