xref: /linux/drivers/md/dm-vdo/recovery-journal.c (revision d358e5254674b70f34c847715ca509e46eb81e6f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "recovery-journal.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 
11 #include "logger.h"
12 #include "memory-alloc.h"
13 #include "permassert.h"
14 
15 #include "block-map.h"
16 #include "completion.h"
17 #include "constants.h"
18 #include "data-vio.h"
19 #include "encodings.h"
20 #include "io-submitter.h"
21 #include "slab-depot.h"
22 #include "types.h"
23 #include "vdo.h"
24 #include "vio.h"
25 #include "wait-queue.h"
26 
27 static const u64 RECOVERY_COUNT_MASK = 0xff;
28 
29 /*
30  * The number of reserved blocks must be large enough to prevent a new recovery journal
31  * block write from overwriting a block which appears to still be a valid head block of the
32  * journal. Currently, that means reserving enough space for all 2048 data_vios.
33  */
34 #define RECOVERY_JOURNAL_RESERVED_BLOCKS				\
35 	((MAXIMUM_VDO_USER_VIOS / RECOVERY_JOURNAL_ENTRIES_PER_BLOCK) + 2)
36 
37 /**
38  * DOC: Lock Counters.
39  *
40  * A lock_counter is intended to keep all of the locks for the blocks in the recovery journal. The
41  * per-zone counters are all kept in a single array which is arranged by zone (i.e. zone 0's lock 0
42  * is at index 0, zone 0's lock 1 is at index 1, and zone 1's lock 0 is at index 'locks'. This
43  * arrangement is intended to minimize cache-line contention for counters from different zones.
44  *
45  * The locks are implemented as a single object instead of as a lock counter per lock both to
46  * afford this opportunity to reduce cache line contention and also to eliminate the need to have a
47  * completion per lock.
48  *
49  * Lock sets are laid out with the set for recovery journal first, followed by the logical zones,
50  * and then the physical zones.
51  */
52 
53 enum lock_counter_state {
54 	LOCK_COUNTER_STATE_NOT_NOTIFYING,
55 	LOCK_COUNTER_STATE_NOTIFYING,
56 	LOCK_COUNTER_STATE_SUSPENDED,
57 };
58 
59 /**
60  * get_zone_count_ptr() - Get a pointer to the zone count for a given lock on a given zone.
61  * @journal: The recovery journal.
62  * @lock_number: The lock to get.
63  * @zone_type: The zone type whose count is desired.
64  *
65  * Return: A pointer to the zone count for the given lock and zone.
66  */
get_zone_count_ptr(struct recovery_journal * journal,block_count_t lock_number,enum vdo_zone_type zone_type)67 static inline atomic_t *get_zone_count_ptr(struct recovery_journal *journal,
68 					   block_count_t lock_number,
69 					   enum vdo_zone_type zone_type)
70 {
71 	return ((zone_type == VDO_ZONE_TYPE_LOGICAL)
72 		? &journal->lock_counter.logical_zone_counts[lock_number]
73 		: &journal->lock_counter.physical_zone_counts[lock_number]);
74 }
75 
76 /**
77  * get_counter() - Get the zone counter for a given lock on a given zone.
78  * @journal: The recovery journal.
79  * @lock_number: The lock to get.
80  * @zone_type: The zone type whose count is desired.
81  * @zone_id: The zone index whose count is desired.
82  *
83  * Return: The counter for the given lock and zone.
84  */
get_counter(struct recovery_journal * journal,block_count_t lock_number,enum vdo_zone_type zone_type,zone_count_t zone_id)85 static inline u16 *get_counter(struct recovery_journal *journal,
86 			       block_count_t lock_number, enum vdo_zone_type zone_type,
87 			       zone_count_t zone_id)
88 {
89 	struct lock_counter *counter = &journal->lock_counter;
90 	block_count_t zone_counter = (counter->locks * zone_id) + lock_number;
91 
92 	if (zone_type == VDO_ZONE_TYPE_JOURNAL)
93 		return &counter->journal_counters[zone_counter];
94 
95 	if (zone_type == VDO_ZONE_TYPE_LOGICAL)
96 		return &counter->logical_counters[zone_counter];
97 
98 	return &counter->physical_counters[zone_counter];
99 }
100 
get_decrement_counter(struct recovery_journal * journal,block_count_t lock_number)101 static atomic_t *get_decrement_counter(struct recovery_journal *journal,
102 				       block_count_t lock_number)
103 {
104 	return &journal->lock_counter.journal_decrement_counts[lock_number];
105 }
106 
107 /**
108  * is_journal_zone_locked() - Check whether the journal zone is locked for a given lock.
109  * @journal: The recovery journal.
110  * @lock_number: The lock to check.
111  *
112  * Return: True if the journal zone is locked.
113  */
is_journal_zone_locked(struct recovery_journal * journal,block_count_t lock_number)114 static bool is_journal_zone_locked(struct recovery_journal *journal,
115 				   block_count_t lock_number)
116 {
117 	u16 journal_value = *get_counter(journal, lock_number, VDO_ZONE_TYPE_JOURNAL, 0);
118 	u32 decrements = atomic_read(get_decrement_counter(journal, lock_number));
119 
120 	/* Pairs with barrier in vdo_release_journal_entry_lock() */
121 	smp_rmb();
122 	VDO_ASSERT_LOG_ONLY((decrements <= journal_value),
123 			    "journal zone lock counter must not underflow");
124 	return (journal_value != decrements);
125 }
126 
127 /**
128  * vdo_release_recovery_journal_block_reference() - Release a reference to a recovery journal
129  *                                                  block.
130  * @journal: The recovery journal.
131  * @sequence_number: The journal sequence number of the referenced block.
132  * @zone_type: The type of the zone making the adjustment.
133  * @zone_id: The ID of the zone making the adjustment.
134  *
135  * If this is the last reference for a given zone type, an attempt will be made to reap the
136  * journal.
137  */
vdo_release_recovery_journal_block_reference(struct recovery_journal * journal,sequence_number_t sequence_number,enum vdo_zone_type zone_type,zone_count_t zone_id)138 void vdo_release_recovery_journal_block_reference(struct recovery_journal *journal,
139 						  sequence_number_t sequence_number,
140 						  enum vdo_zone_type zone_type,
141 						  zone_count_t zone_id)
142 {
143 	u16 *current_value;
144 	block_count_t lock_number;
145 	int prior_state;
146 
147 	if (sequence_number == 0)
148 		return;
149 
150 	lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
151 	current_value = get_counter(journal, lock_number, zone_type, zone_id);
152 
153 	VDO_ASSERT_LOG_ONLY((*current_value >= 1),
154 			    "decrement of lock counter must not underflow");
155 	*current_value -= 1;
156 
157 	if (zone_type == VDO_ZONE_TYPE_JOURNAL) {
158 		if (is_journal_zone_locked(journal, lock_number))
159 			return;
160 	} else {
161 		atomic_t *zone_count;
162 
163 		if (*current_value != 0)
164 			return;
165 
166 		zone_count = get_zone_count_ptr(journal, lock_number, zone_type);
167 
168 		if (atomic_add_return(-1, zone_count) > 0)
169 			return;
170 	}
171 
172 	/*
173 	 * Extra barriers because this was original developed using a CAS operation that implicitly
174 	 * had them.
175 	 */
176 	smp_mb__before_atomic();
177 	prior_state = atomic_cmpxchg(&journal->lock_counter.state,
178 				     LOCK_COUNTER_STATE_NOT_NOTIFYING,
179 				     LOCK_COUNTER_STATE_NOTIFYING);
180 	/* same as before_atomic */
181 	smp_mb__after_atomic();
182 
183 	if (prior_state != LOCK_COUNTER_STATE_NOT_NOTIFYING)
184 		return;
185 
186 	vdo_launch_completion(&journal->lock_counter.completion);
187 }
188 
get_journal_block(struct list_head * list)189 static inline struct recovery_journal_block * __must_check get_journal_block(struct list_head *list)
190 {
191 	return list_first_entry_or_null(list, struct recovery_journal_block, list_node);
192 }
193 
194 /**
195  * pop_free_list() - Get a block from the end of the free list.
196  * @journal: The journal.
197  *
198  * Return: The block or NULL if the list is empty.
199  */
pop_free_list(struct recovery_journal * journal)200 static struct recovery_journal_block * __must_check pop_free_list(struct recovery_journal *journal)
201 {
202 	struct recovery_journal_block *block;
203 
204 	if (list_empty(&journal->free_tail_blocks))
205 		return NULL;
206 
207 	block = list_last_entry(&journal->free_tail_blocks,
208 				struct recovery_journal_block, list_node);
209 	list_del_init(&block->list_node);
210 	return block;
211 }
212 
213 /**
214  * is_block_dirty() - Check whether a recovery block is dirty.
215  * @block: The block to check.
216  *
217  * Indicates it has any uncommitted entries, which includes both entries not written and entries
218  * written but not yet acknowledged.
219  *
220  * Return: True if the block has any uncommitted entries.
221  */
is_block_dirty(const struct recovery_journal_block * block)222 static inline bool __must_check is_block_dirty(const struct recovery_journal_block *block)
223 {
224 	return (block->uncommitted_entry_count > 0);
225 }
226 
227 /**
228  * is_block_empty() - Check whether a journal block is empty.
229  * @block: The block to check.
230  *
231  * Return: True if the block has no entries.
232  */
is_block_empty(const struct recovery_journal_block * block)233 static inline bool __must_check is_block_empty(const struct recovery_journal_block *block)
234 {
235 	return (block->entry_count == 0);
236 }
237 
238 /**
239  * is_block_full() - Check whether a journal block is full.
240  * @block: The block to check.
241  *
242  * Return: True if the block is full.
243  */
is_block_full(const struct recovery_journal_block * block)244 static inline bool __must_check is_block_full(const struct recovery_journal_block *block)
245 {
246 	return ((block == NULL) || (block->journal->entries_per_block == block->entry_count));
247 }
248 
249 /**
250  * assert_on_journal_thread() - Assert that we are running on the journal thread.
251  * @journal: The journal.
252  * @function_name: The function doing the check (for logging).
253  */
assert_on_journal_thread(struct recovery_journal * journal,const char * function_name)254 static void assert_on_journal_thread(struct recovery_journal *journal,
255 				     const char *function_name)
256 {
257 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == journal->thread_id),
258 			    "%s() called on journal thread", function_name);
259 }
260 
261 /**
262  * continue_waiter() - Release a data_vio from the journal.
263  * @waiter: The data_vio waiting on journal activity.
264  * @context: The result of the journal operation.
265  *
266  * Invoked whenever a data_vio is to be released from the journal, either because its entry was
267  * committed to disk, or because there was an error. Implements waiter_callback_fn.
268  */
continue_waiter(struct vdo_waiter * waiter,void * context)269 static void continue_waiter(struct vdo_waiter *waiter, void *context)
270 {
271 	continue_data_vio_with_error(vdo_waiter_as_data_vio(waiter), *((int *) context));
272 }
273 
274 /**
275  * has_block_waiters() - Check whether the journal has any waiters on any blocks.
276  * @journal: The journal in question.
277  *
278  * Return: True if any block has a waiter.
279  */
has_block_waiters(struct recovery_journal * journal)280 static inline bool has_block_waiters(struct recovery_journal *journal)
281 {
282 	struct recovery_journal_block *block = get_journal_block(&journal->active_tail_blocks);
283 
284 	/*
285 	 * Either the first active tail block (if it exists) has waiters, or no active tail block
286 	 * has waiters.
287 	 */
288 	return ((block != NULL) &&
289 		(vdo_waitq_has_waiters(&block->entry_waiters) ||
290 		 vdo_waitq_has_waiters(&block->commit_waiters)));
291 }
292 
293 static void recycle_journal_blocks(struct recovery_journal *journal);
294 static void recycle_journal_block(struct recovery_journal_block *block);
295 static void notify_commit_waiters(struct recovery_journal *journal);
296 
297 /**
298  * suspend_lock_counter() - Prevent the lock counter from notifying.
299  * @counter: The counter.
300  *
301  * Return: True if the lock counter was not notifying and hence the suspend was efficacious.
302  */
suspend_lock_counter(struct lock_counter * counter)303 static bool suspend_lock_counter(struct lock_counter *counter)
304 {
305 	int prior_state;
306 
307 	/*
308 	 * Extra barriers because this was originally developed using a CAS operation that
309 	 * implicitly had them.
310 	 */
311 	smp_mb__before_atomic();
312 	prior_state = atomic_cmpxchg(&counter->state, LOCK_COUNTER_STATE_NOT_NOTIFYING,
313 				     LOCK_COUNTER_STATE_SUSPENDED);
314 	/* same as before_atomic */
315 	smp_mb__after_atomic();
316 
317 	return ((prior_state == LOCK_COUNTER_STATE_SUSPENDED) ||
318 		(prior_state == LOCK_COUNTER_STATE_NOT_NOTIFYING));
319 }
320 
is_read_only(struct recovery_journal * journal)321 static inline bool is_read_only(struct recovery_journal *journal)
322 {
323 	return vdo_is_read_only(journal->flush_vio->completion.vdo);
324 }
325 
326 /**
327  * check_for_drain_complete() - Check whether the journal has drained.
328  * @journal: The journal which may have just drained.
329  */
check_for_drain_complete(struct recovery_journal * journal)330 static void check_for_drain_complete(struct recovery_journal *journal)
331 {
332 	int result = VDO_SUCCESS;
333 
334 	if (is_read_only(journal)) {
335 		result = VDO_READ_ONLY;
336 		/*
337 		 * Clean up any full active blocks which were not written due to read-only mode.
338 		 *
339 		 * FIXME: This would probably be better as a short-circuit in write_block().
340 		 */
341 		notify_commit_waiters(journal);
342 		recycle_journal_blocks(journal);
343 
344 		/* Release any data_vios waiting to be assigned entries. */
345 		vdo_waitq_notify_all_waiters(&journal->entry_waiters,
346 					     continue_waiter, &result);
347 	}
348 
349 	if (!vdo_is_state_draining(&journal->state) ||
350 	    journal->reaping ||
351 	    has_block_waiters(journal) ||
352 	    vdo_waitq_has_waiters(&journal->entry_waiters) ||
353 	    !suspend_lock_counter(&journal->lock_counter))
354 		return;
355 
356 	if (vdo_is_state_saving(&journal->state)) {
357 		if (journal->active_block != NULL) {
358 			VDO_ASSERT_LOG_ONLY(((result == VDO_READ_ONLY) ||
359 					     !is_block_dirty(journal->active_block)),
360 					    "journal being saved has clean active block");
361 			recycle_journal_block(journal->active_block);
362 		}
363 
364 		VDO_ASSERT_LOG_ONLY(list_empty(&journal->active_tail_blocks),
365 				    "all blocks in a journal being saved must be inactive");
366 	}
367 
368 	vdo_finish_draining_with_result(&journal->state, result);
369 }
370 
371 /**
372  * notify_recovery_journal_of_read_only_mode() - Notify a recovery journal that the VDO has gone
373  *                                               read-only.
374  * @listener: The journal.
375  * @parent: The completion to notify in order to acknowledge the notification.
376  *
377  * Implements vdo_read_only_notification_fn.
378  */
notify_recovery_journal_of_read_only_mode(void * listener,struct vdo_completion * parent)379 static void notify_recovery_journal_of_read_only_mode(void *listener,
380 						      struct vdo_completion *parent)
381 {
382 	check_for_drain_complete(listener);
383 	vdo_finish_completion(parent);
384 }
385 
386 /**
387  * enter_journal_read_only_mode() - Put the journal in read-only mode.
388  * @journal: The journal which has failed.
389  * @error_code: The error result triggering this call.
390  *
391  * All attempts to add entries after this function is called will fail. All VIOs waiting for
392  * commits will be awakened with an error.
393  */
enter_journal_read_only_mode(struct recovery_journal * journal,int error_code)394 static void enter_journal_read_only_mode(struct recovery_journal *journal,
395 					 int error_code)
396 {
397 	vdo_enter_read_only_mode(journal->flush_vio->completion.vdo, error_code);
398 	check_for_drain_complete(journal);
399 }
400 
401 /**
402  * vdo_get_recovery_journal_current_sequence_number() - Obtain the recovery journal's current
403  *                                                      sequence number.
404  * @journal: The journal in question.
405  *
406  * Exposed only so the block map can be initialized therefrom.
407  *
408  * Return: The sequence number of the tail block.
409  */
vdo_get_recovery_journal_current_sequence_number(struct recovery_journal * journal)410 sequence_number_t vdo_get_recovery_journal_current_sequence_number(struct recovery_journal *journal)
411 {
412 	return journal->tail;
413 }
414 
415 /**
416  * get_recovery_journal_head() - Get the head of the recovery journal.
417  * @journal: The journal.
418  *
419  * The head is the lowest sequence number of the block map head and the slab journal head.
420  *
421  * Return: The head of the journal.
422  */
get_recovery_journal_head(const struct recovery_journal * journal)423 static inline sequence_number_t get_recovery_journal_head(const struct recovery_journal *journal)
424 {
425 	return min(journal->block_map_head, journal->slab_journal_head);
426 }
427 
428 /**
429  * compute_recovery_count_byte() - Compute the recovery count byte for a given recovery count.
430  * @recovery_count: The recovery count.
431  *
432  * Return: The byte corresponding to the recovery count.
433  */
compute_recovery_count_byte(u64 recovery_count)434 static inline u8 __must_check compute_recovery_count_byte(u64 recovery_count)
435 {
436 	return (u8)(recovery_count & RECOVERY_COUNT_MASK);
437 }
438 
439 /**
440  * check_slab_journal_commit_threshold() - Check whether the journal is over the threshold, and if
441  *                                         so, force the oldest slab journal tail block to commit.
442  * @journal: The journal.
443  */
check_slab_journal_commit_threshold(struct recovery_journal * journal)444 static void check_slab_journal_commit_threshold(struct recovery_journal *journal)
445 {
446 	block_count_t current_length = journal->tail - journal->slab_journal_head;
447 
448 	if (current_length > journal->slab_journal_commit_threshold) {
449 		journal->events.slab_journal_commits_requested++;
450 		vdo_commit_oldest_slab_journal_tail_blocks(journal->depot,
451 							   journal->slab_journal_head);
452 	}
453 }
454 
455 static void reap_recovery_journal(struct recovery_journal *journal);
456 static void assign_entries(struct recovery_journal *journal);
457 
458 /**
459  * finish_reaping() - Finish reaping the journal.
460  * @journal: The journal being reaped.
461  */
finish_reaping(struct recovery_journal * journal)462 static void finish_reaping(struct recovery_journal *journal)
463 {
464 	block_count_t blocks_reaped;
465 	sequence_number_t old_head = get_recovery_journal_head(journal);
466 
467 	journal->block_map_head = journal->block_map_reap_head;
468 	journal->slab_journal_head = journal->slab_journal_reap_head;
469 	blocks_reaped = get_recovery_journal_head(journal) - old_head;
470 	journal->available_space += blocks_reaped * journal->entries_per_block;
471 	journal->reaping = false;
472 	check_slab_journal_commit_threshold(journal);
473 	assign_entries(journal);
474 	check_for_drain_complete(journal);
475 }
476 
477 /**
478  * complete_reaping() - Finish reaping the journal after flushing the lower layer.
479  * @completion: The journal's flush VIO.
480  *
481  * This is the callback registered in reap_recovery_journal().
482  */
complete_reaping(struct vdo_completion * completion)483 static void complete_reaping(struct vdo_completion *completion)
484 {
485 	struct recovery_journal *journal = completion->parent;
486 
487 	finish_reaping(journal);
488 
489 	/* Try reaping again in case more locks were released while flush was out. */
490 	reap_recovery_journal(journal);
491 }
492 
493 /**
494  * handle_flush_error() - Handle an error when flushing the lower layer due to reaping.
495  * @completion: The journal's flush VIO.
496  */
handle_flush_error(struct vdo_completion * completion)497 static void handle_flush_error(struct vdo_completion *completion)
498 {
499 	struct recovery_journal *journal = completion->parent;
500 
501 	vio_record_metadata_io_error(as_vio(completion));
502 	journal->reaping = false;
503 	enter_journal_read_only_mode(journal, completion->result);
504 }
505 
flush_endio(struct bio * bio)506 static void flush_endio(struct bio *bio)
507 {
508 	struct vio *vio = bio->bi_private;
509 	struct recovery_journal *journal = vio->completion.parent;
510 
511 	continue_vio_after_io(vio, complete_reaping, journal->thread_id);
512 }
513 
514 /**
515  * initialize_journal_state() - Set all journal fields appropriately to start journaling from the
516  *                              current active block.
517  * @journal: The journal to be reset based on its active block.
518  */
initialize_journal_state(struct recovery_journal * journal)519 static void initialize_journal_state(struct recovery_journal *journal)
520 {
521 	journal->append_point.sequence_number = journal->tail;
522 	journal->last_write_acknowledged = journal->tail;
523 	journal->block_map_head = journal->tail;
524 	journal->slab_journal_head = journal->tail;
525 	journal->block_map_reap_head = journal->tail;
526 	journal->slab_journal_reap_head = journal->tail;
527 	journal->block_map_head_block_number =
528 		vdo_get_recovery_journal_block_number(journal, journal->block_map_head);
529 	journal->slab_journal_head_block_number =
530 		vdo_get_recovery_journal_block_number(journal,
531 						      journal->slab_journal_head);
532 	journal->available_space =
533 		(journal->entries_per_block * vdo_get_recovery_journal_length(journal->size));
534 }
535 
536 /**
537  * vdo_get_recovery_journal_length() - Get the number of usable recovery journal blocks.
538  * @journal_size: The size of the recovery journal in blocks.
539  *
540  * Return: The number of recovery journal blocks usable for entries.
541  */
vdo_get_recovery_journal_length(block_count_t journal_size)542 block_count_t vdo_get_recovery_journal_length(block_count_t journal_size)
543 {
544 	block_count_t reserved_blocks = journal_size / 4;
545 
546 	if (reserved_blocks > RECOVERY_JOURNAL_RESERVED_BLOCKS)
547 		reserved_blocks = RECOVERY_JOURNAL_RESERVED_BLOCKS;
548 	return (journal_size - reserved_blocks);
549 }
550 
551 /**
552  * reap_recovery_journal_callback() - Attempt to reap the journal.
553  * @completion: The lock counter completion.
554  *
555  * Attempts to reap the journal now that all the locks on some journal block have been released.
556  * This is the callback registered with the lock counter.
557  */
reap_recovery_journal_callback(struct vdo_completion * completion)558 static void reap_recovery_journal_callback(struct vdo_completion *completion)
559 {
560 	struct recovery_journal *journal = (struct recovery_journal *) completion->parent;
561 	/*
562 	 * The acknowledgment must be done before reaping so that there is no race between
563 	 * acknowledging the notification and unlocks wishing to notify.
564 	 */
565 	smp_wmb();
566 	atomic_set(&journal->lock_counter.state, LOCK_COUNTER_STATE_NOT_NOTIFYING);
567 
568 	if (vdo_is_state_quiescing(&journal->state)) {
569 		/*
570 		 * Don't start reaping when the journal is trying to quiesce. Do check if this
571 		 * notification is the last thing the is waiting on.
572 		 */
573 		check_for_drain_complete(journal);
574 		return;
575 	}
576 
577 	reap_recovery_journal(journal);
578 	check_slab_journal_commit_threshold(journal);
579 }
580 
581 /**
582  * initialize_lock_counter() - Initialize a lock counter.
583  *
584  * @journal: The recovery journal.
585  * @vdo: The vdo.
586  *
587  * Return: VDO_SUCCESS or an error.
588  */
initialize_lock_counter(struct recovery_journal * journal,struct vdo * vdo)589 static int __must_check initialize_lock_counter(struct recovery_journal *journal,
590 						struct vdo *vdo)
591 {
592 	int result;
593 	struct thread_config *config = &vdo->thread_config;
594 	struct lock_counter *counter = &journal->lock_counter;
595 
596 	result = vdo_allocate(journal->size, u16, __func__, &counter->journal_counters);
597 	if (result != VDO_SUCCESS)
598 		return result;
599 
600 	result = vdo_allocate(journal->size, atomic_t, __func__,
601 			      &counter->journal_decrement_counts);
602 	if (result != VDO_SUCCESS)
603 		return result;
604 
605 	result = vdo_allocate(journal->size * config->logical_zone_count, u16, __func__,
606 			      &counter->logical_counters);
607 	if (result != VDO_SUCCESS)
608 		return result;
609 
610 	result = vdo_allocate(journal->size, atomic_t, __func__,
611 			      &counter->logical_zone_counts);
612 	if (result != VDO_SUCCESS)
613 		return result;
614 
615 	result = vdo_allocate(journal->size * config->physical_zone_count, u16, __func__,
616 			      &counter->physical_counters);
617 	if (result != VDO_SUCCESS)
618 		return result;
619 
620 	result = vdo_allocate(journal->size, atomic_t, __func__,
621 			      &counter->physical_zone_counts);
622 	if (result != VDO_SUCCESS)
623 		return result;
624 
625 	vdo_initialize_completion(&counter->completion, vdo,
626 				  VDO_LOCK_COUNTER_COMPLETION);
627 	vdo_prepare_completion(&counter->completion, reap_recovery_journal_callback,
628 			       reap_recovery_journal_callback, config->journal_thread,
629 			       journal);
630 	counter->logical_zones = config->logical_zone_count;
631 	counter->physical_zones = config->physical_zone_count;
632 	counter->locks = journal->size;
633 	return VDO_SUCCESS;
634 }
635 
636 /**
637  * set_journal_tail() - Set the journal's tail sequence number.
638  * @journal: The journal whose tail is to be set.
639  * @tail: The new tail value.
640  */
set_journal_tail(struct recovery_journal * journal,sequence_number_t tail)641 static void set_journal_tail(struct recovery_journal *journal, sequence_number_t tail)
642 {
643 	/* VDO does not support sequence numbers above 1 << 48 in the slab journal. */
644 	if (tail >= (1ULL << 48))
645 		enter_journal_read_only_mode(journal, VDO_JOURNAL_OVERFLOW);
646 
647 	journal->tail = tail;
648 }
649 
650 /**
651  * initialize_recovery_block() - Initialize a journal block.
652  * @vdo: The vdo from which to construct vios.
653  * @journal: The journal to which the block will belong.
654  * @block: The block to initialize.
655  *
656  * Return: VDO_SUCCESS or an error.
657  */
initialize_recovery_block(struct vdo * vdo,struct recovery_journal * journal,struct recovery_journal_block * block)658 static int initialize_recovery_block(struct vdo *vdo, struct recovery_journal *journal,
659 				     struct recovery_journal_block *block)
660 {
661 	char *data;
662 	int result;
663 
664 	/*
665 	 * Ensure that a block is large enough to store RECOVERY_JOURNAL_ENTRIES_PER_BLOCK entries.
666 	 */
667 	BUILD_BUG_ON(RECOVERY_JOURNAL_ENTRIES_PER_BLOCK >
668 		     ((VDO_BLOCK_SIZE - sizeof(struct packed_journal_header)) /
669 		      sizeof(struct packed_recovery_journal_entry)));
670 
671 	/*
672 	 * Allocate a full block for the journal block even though not all of the space is used
673 	 * since the VIO needs to write a full disk block.
674 	 */
675 	result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &data);
676 	if (result != VDO_SUCCESS)
677 		return result;
678 
679 	result = allocate_vio_components(vdo, VIO_TYPE_RECOVERY_JOURNAL,
680 					 VIO_PRIORITY_HIGH, block, 1, data, &block->vio);
681 	if (result != VDO_SUCCESS) {
682 		vdo_free(data);
683 		return result;
684 	}
685 
686 	list_add_tail(&block->list_node, &journal->free_tail_blocks);
687 	block->journal = journal;
688 	return VDO_SUCCESS;
689 }
690 
691 /**
692  * vdo_decode_recovery_journal() - Make a recovery journal and initialize it with the state that
693  *                                 was decoded from the super block.
694  *
695  * @state: The decoded state of the journal.
696  * @nonce: The nonce of the VDO.
697  * @vdo: The VDO.
698  * @partition: The partition for the journal.
699  * @recovery_count: The VDO's number of completed recoveries.
700  * @journal_size: The number of blocks in the journal on disk.
701  * @journal_ptr: The pointer to hold the new recovery journal.
702  *
703  * Return: A success or error code.
704  */
vdo_decode_recovery_journal(struct recovery_journal_state_7_0 state,nonce_t nonce,struct vdo * vdo,struct partition * partition,u64 recovery_count,block_count_t journal_size,struct recovery_journal ** journal_ptr)705 int vdo_decode_recovery_journal(struct recovery_journal_state_7_0 state, nonce_t nonce,
706 				struct vdo *vdo, struct partition *partition,
707 				u64 recovery_count, block_count_t journal_size,
708 				struct recovery_journal **journal_ptr)
709 {
710 	block_count_t i;
711 	struct recovery_journal *journal;
712 	int result;
713 
714 	result = vdo_allocate_extended(struct recovery_journal,
715 				       RECOVERY_JOURNAL_RESERVED_BLOCKS,
716 				       struct recovery_journal_block, __func__,
717 				       &journal);
718 	if (result != VDO_SUCCESS)
719 		return result;
720 
721 	INIT_LIST_HEAD(&journal->free_tail_blocks);
722 	INIT_LIST_HEAD(&journal->active_tail_blocks);
723 	vdo_waitq_init(&journal->pending_writes);
724 
725 	journal->thread_id = vdo->thread_config.journal_thread;
726 	journal->origin = partition->offset;
727 	journal->nonce = nonce;
728 	journal->recovery_count = compute_recovery_count_byte(recovery_count);
729 	journal->size = journal_size;
730 	journal->slab_journal_commit_threshold = (journal_size * 2) / 3;
731 	journal->logical_blocks_used = state.logical_blocks_used;
732 	journal->block_map_data_blocks = state.block_map_data_blocks;
733 	journal->entries_per_block = RECOVERY_JOURNAL_ENTRIES_PER_BLOCK;
734 	set_journal_tail(journal, state.journal_start);
735 	initialize_journal_state(journal);
736 	/* TODO: this will have to change if we make initial resume of a VDO a real resume */
737 	vdo_set_admin_state_code(&journal->state, VDO_ADMIN_STATE_SUSPENDED);
738 
739 	for (i = 0; i < RECOVERY_JOURNAL_RESERVED_BLOCKS; i++) {
740 		struct recovery_journal_block *block = &journal->blocks[i];
741 
742 		result = initialize_recovery_block(vdo, journal, block);
743 		if (result != VDO_SUCCESS) {
744 			vdo_free_recovery_journal(journal);
745 			return result;
746 		}
747 	}
748 
749 	result = initialize_lock_counter(journal, vdo);
750 	if (result != VDO_SUCCESS) {
751 		vdo_free_recovery_journal(journal);
752 		return result;
753 	}
754 
755 	result = create_metadata_vio(vdo, VIO_TYPE_RECOVERY_JOURNAL, VIO_PRIORITY_HIGH,
756 				     journal, NULL, &journal->flush_vio);
757 	if (result != VDO_SUCCESS) {
758 		vdo_free_recovery_journal(journal);
759 		return result;
760 	}
761 
762 	result = vdo_register_read_only_listener(vdo, journal,
763 						 notify_recovery_journal_of_read_only_mode,
764 						 journal->thread_id);
765 	if (result != VDO_SUCCESS) {
766 		vdo_free_recovery_journal(journal);
767 		return result;
768 	}
769 
770 	result = vdo_make_default_thread(vdo, journal->thread_id);
771 	if (result != VDO_SUCCESS) {
772 		vdo_free_recovery_journal(journal);
773 		return result;
774 	}
775 
776 	journal->flush_vio->completion.callback_thread_id = journal->thread_id;
777 	*journal_ptr = journal;
778 	return VDO_SUCCESS;
779 }
780 
781 /**
782  * vdo_free_recovery_journal() - Free a recovery journal.
783  * @journal: The recovery journal to free.
784  */
vdo_free_recovery_journal(struct recovery_journal * journal)785 void vdo_free_recovery_journal(struct recovery_journal *journal)
786 {
787 	block_count_t i;
788 
789 	if (journal == NULL)
790 		return;
791 
792 	vdo_free(vdo_forget(journal->lock_counter.logical_zone_counts));
793 	vdo_free(vdo_forget(journal->lock_counter.physical_zone_counts));
794 	vdo_free(vdo_forget(journal->lock_counter.journal_counters));
795 	vdo_free(vdo_forget(journal->lock_counter.journal_decrement_counts));
796 	vdo_free(vdo_forget(journal->lock_counter.logical_counters));
797 	vdo_free(vdo_forget(journal->lock_counter.physical_counters));
798 	free_vio(vdo_forget(journal->flush_vio));
799 
800 	/*
801 	 * FIXME: eventually, the journal should be constructed in a quiescent state which
802 	 *        requires opening before use.
803 	 */
804 	if (!vdo_is_state_quiescent(&journal->state)) {
805 		VDO_ASSERT_LOG_ONLY(list_empty(&journal->active_tail_blocks),
806 				    "journal being freed has no active tail blocks");
807 	} else if (!vdo_is_state_saved(&journal->state) &&
808 		   !list_empty(&journal->active_tail_blocks)) {
809 		vdo_log_warning("journal being freed has uncommitted entries");
810 	}
811 
812 	for (i = 0; i < RECOVERY_JOURNAL_RESERVED_BLOCKS; i++) {
813 		struct recovery_journal_block *block = &journal->blocks[i];
814 
815 		vdo_free(vdo_forget(block->vio.data));
816 		free_vio_components(&block->vio);
817 	}
818 
819 	vdo_free(journal);
820 }
821 
822 /**
823  * vdo_initialize_recovery_journal_post_repair() - Initialize the journal after a repair.
824  * @journal: The journal in question.
825  * @recovery_count: The number of completed recoveries.
826  * @tail: The new tail block sequence number.
827  * @logical_blocks_used: The new number of logical blocks used.
828  * @block_map_data_blocks: The new number of block map data blocks.
829  */
vdo_initialize_recovery_journal_post_repair(struct recovery_journal * journal,u64 recovery_count,sequence_number_t tail,block_count_t logical_blocks_used,block_count_t block_map_data_blocks)830 void vdo_initialize_recovery_journal_post_repair(struct recovery_journal *journal,
831 						 u64 recovery_count,
832 						 sequence_number_t tail,
833 						 block_count_t logical_blocks_used,
834 						 block_count_t block_map_data_blocks)
835 {
836 	set_journal_tail(journal, tail + 1);
837 	journal->recovery_count = compute_recovery_count_byte(recovery_count);
838 	initialize_journal_state(journal);
839 	journal->logical_blocks_used = logical_blocks_used;
840 	journal->block_map_data_blocks = block_map_data_blocks;
841 }
842 
843 /**
844  * vdo_get_journal_block_map_data_blocks_used() - Get the number of block map pages, allocated from
845  *                                                data blocks, currently in use.
846  * @journal: The journal in question.
847  *
848  * Return: The number of block map pages allocated from slabs.
849  */
vdo_get_journal_block_map_data_blocks_used(struct recovery_journal * journal)850 block_count_t vdo_get_journal_block_map_data_blocks_used(struct recovery_journal *journal)
851 {
852 	return journal->block_map_data_blocks;
853 }
854 
855 /**
856  * vdo_get_recovery_journal_thread_id() - Get the ID of a recovery journal's thread.
857  * @journal: The journal to query.
858  *
859  * Return: The ID of the journal's thread.
860  */
vdo_get_recovery_journal_thread_id(struct recovery_journal * journal)861 thread_id_t vdo_get_recovery_journal_thread_id(struct recovery_journal *journal)
862 {
863 	return journal->thread_id;
864 }
865 
866 /**
867  * vdo_open_recovery_journal() - Prepare the journal for new entries.
868  * @journal: The journal in question.
869  * @depot: The slab depot for this VDO.
870  * @block_map: The block map for this VDO.
871  */
vdo_open_recovery_journal(struct recovery_journal * journal,struct slab_depot * depot,struct block_map * block_map)872 void vdo_open_recovery_journal(struct recovery_journal *journal,
873 			       struct slab_depot *depot, struct block_map *block_map)
874 {
875 	journal->depot = depot;
876 	journal->block_map = block_map;
877 	WRITE_ONCE(journal->state.current_state, VDO_ADMIN_STATE_NORMAL_OPERATION);
878 }
879 
880 /**
881  * vdo_record_recovery_journal() - Record the state of a recovery journal for encoding in the super
882  *                                 block.
883  * @journal: the recovery journal.
884  *
885  * Return: the state of the journal.
886  */
887 struct recovery_journal_state_7_0
vdo_record_recovery_journal(const struct recovery_journal * journal)888 vdo_record_recovery_journal(const struct recovery_journal *journal)
889 {
890 	struct recovery_journal_state_7_0 state = {
891 		.logical_blocks_used = journal->logical_blocks_used,
892 		.block_map_data_blocks = journal->block_map_data_blocks,
893 	};
894 
895 	if (vdo_is_state_saved(&journal->state)) {
896 		/*
897 		 * If the journal is saved, we should start one past the active block (since the
898 		 * active block is not guaranteed to be empty).
899 		 */
900 		state.journal_start = journal->tail;
901 	} else {
902 		/*
903 		 * When we're merely suspended or have gone read-only, we must record the first
904 		 * block that might have entries that need to be applied.
905 		 */
906 		state.journal_start = get_recovery_journal_head(journal);
907 	}
908 
909 	return state;
910 }
911 
912 /**
913  * get_block_header() - Get a pointer to the packed journal block header in the block buffer.
914  * @block: The recovery block.
915  *
916  * Return: The block's header.
917  */
918 static inline struct packed_journal_header *
get_block_header(const struct recovery_journal_block * block)919 get_block_header(const struct recovery_journal_block *block)
920 {
921 	return (struct packed_journal_header *) block->vio.data;
922 }
923 
924 /**
925  * set_active_sector() - Set the current sector of the current block and initialize it.
926  * @block: The block to update.
927  * @sector: A pointer to the first byte of the new sector.
928  */
set_active_sector(struct recovery_journal_block * block,void * sector)929 static void set_active_sector(struct recovery_journal_block *block, void *sector)
930 {
931 	block->sector = sector;
932 	block->sector->check_byte = get_block_header(block)->check_byte;
933 	block->sector->recovery_count = block->journal->recovery_count;
934 	block->sector->entry_count = 0;
935 }
936 
937 /**
938  * advance_tail() - Advance the tail of the journal.
939  * @journal: The journal whose tail should be advanced.
940  *
941  * Return: true if the tail was advanced.
942  */
advance_tail(struct recovery_journal * journal)943 static bool advance_tail(struct recovery_journal *journal)
944 {
945 	struct recovery_block_header unpacked;
946 	struct packed_journal_header *header;
947 	struct recovery_journal_block *block;
948 
949 	block = journal->active_block = pop_free_list(journal);
950 	if (block == NULL)
951 		return false;
952 
953 	list_move_tail(&block->list_node, &journal->active_tail_blocks);
954 
955 	unpacked = (struct recovery_block_header) {
956 		.metadata_type = VDO_METADATA_RECOVERY_JOURNAL_2,
957 		.block_map_data_blocks = journal->block_map_data_blocks,
958 		.logical_blocks_used = journal->logical_blocks_used,
959 		.nonce = journal->nonce,
960 		.recovery_count = journal->recovery_count,
961 		.sequence_number = journal->tail,
962 		.check_byte = vdo_compute_recovery_journal_check_byte(journal,
963 								      journal->tail),
964 	};
965 
966 	header = get_block_header(block);
967 	memset(block->vio.data, 0x0, VDO_BLOCK_SIZE);
968 	block->sequence_number = journal->tail;
969 	block->entry_count = 0;
970 	block->uncommitted_entry_count = 0;
971 	block->block_number = vdo_get_recovery_journal_block_number(journal,
972 								    journal->tail);
973 
974 	vdo_pack_recovery_block_header(&unpacked, header);
975 	set_active_sector(block, vdo_get_journal_block_sector(header, 1));
976 	set_journal_tail(journal, journal->tail + 1);
977 	vdo_advance_block_map_era(journal->block_map, journal->tail);
978 	return true;
979 }
980 
981 /**
982  * initialize_lock_count() - Initialize the value of the journal zone's counter for a given lock.
983  * @journal: The recovery journal.
984  *
985  * Context: This must be called from the journal zone.
986  */
initialize_lock_count(struct recovery_journal * journal)987 static void initialize_lock_count(struct recovery_journal *journal)
988 {
989 	u16 *journal_value;
990 	block_count_t lock_number = journal->active_block->block_number;
991 	atomic_t *decrement_counter = get_decrement_counter(journal, lock_number);
992 
993 	journal_value = get_counter(journal, lock_number, VDO_ZONE_TYPE_JOURNAL, 0);
994 	VDO_ASSERT_LOG_ONLY((*journal_value == atomic_read(decrement_counter)),
995 			    "count to be initialized not in use");
996 	*journal_value = journal->entries_per_block + 1;
997 	atomic_set(decrement_counter, 0);
998 }
999 
1000 /**
1001  * prepare_to_assign_entry() - Prepare the currently active block to receive an entry and check
1002  *			       whether an entry of the given type may be assigned at this time.
1003  * @journal: The journal receiving an entry.
1004  *
1005  * Return: true if there is space in the journal to store an entry of the specified type.
1006  */
prepare_to_assign_entry(struct recovery_journal * journal)1007 static bool prepare_to_assign_entry(struct recovery_journal *journal)
1008 {
1009 	if (journal->available_space == 0)
1010 		return false;
1011 
1012 	if (is_block_full(journal->active_block) && !advance_tail(journal))
1013 		return false;
1014 
1015 	if (!is_block_empty(journal->active_block))
1016 		return true;
1017 
1018 	if ((journal->tail - get_recovery_journal_head(journal)) > journal->size) {
1019 		/* Cannot use this block since the journal is full. */
1020 		journal->events.disk_full++;
1021 		return false;
1022 	}
1023 
1024 	/*
1025 	 * Don't allow the new block to be reaped until all of its entries have been committed to
1026 	 * the block map and until the journal block has been fully committed as well. Because the
1027 	 * block map update is done only after any slab journal entries have been made, the
1028 	 * per-entry lock for the block map entry serves to protect those as well.
1029 	 */
1030 	initialize_lock_count(journal);
1031 	return true;
1032 }
1033 
1034 static void write_blocks(struct recovery_journal *journal);
1035 
1036 /**
1037  * schedule_block_write() - Queue a block for writing.
1038  * @journal: The journal in question.
1039  * @block: The block which is now ready to write.
1040  *
1041  * The block is expected to be full. If the block is currently writing, this is a noop as the block
1042  * will be queued for writing when the write finishes. The block must not currently be queued for
1043  * writing.
1044  */
schedule_block_write(struct recovery_journal * journal,struct recovery_journal_block * block)1045 static void schedule_block_write(struct recovery_journal *journal,
1046 				 struct recovery_journal_block *block)
1047 {
1048 	if (!block->committing)
1049 		vdo_waitq_enqueue_waiter(&journal->pending_writes, &block->write_waiter);
1050 	/*
1051 	 * At the end of adding entries, or discovering this partial block is now full and ready to
1052 	 * rewrite, we will call write_blocks() and write a whole batch.
1053 	 */
1054 }
1055 
1056 /**
1057  * release_journal_block_reference() - Release a reference to a journal block.
1058  * @block: The journal block from which to release a reference.
1059  */
release_journal_block_reference(struct recovery_journal_block * block)1060 static void release_journal_block_reference(struct recovery_journal_block *block)
1061 {
1062 	vdo_release_recovery_journal_block_reference(block->journal,
1063 						     block->sequence_number,
1064 						     VDO_ZONE_TYPE_JOURNAL, 0);
1065 }
1066 
update_usages(struct recovery_journal * journal,struct data_vio * data_vio)1067 static void update_usages(struct recovery_journal *journal, struct data_vio *data_vio)
1068 {
1069 	if (data_vio->increment_updater.operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
1070 		journal->block_map_data_blocks++;
1071 		return;
1072 	}
1073 
1074 	if (data_vio->new_mapped.state != VDO_MAPPING_STATE_UNMAPPED)
1075 		journal->logical_blocks_used++;
1076 
1077 	if (data_vio->mapped.state != VDO_MAPPING_STATE_UNMAPPED)
1078 		journal->logical_blocks_used--;
1079 }
1080 
1081 /**
1082  * assign_entry() - Assign an entry waiter to the active block.
1083  * @waiter: The data_vio.
1084  * @context: The recovery journal block.
1085  *
1086  * Implements waiter_callback_fn.
1087  */
assign_entry(struct vdo_waiter * waiter,void * context)1088 static void assign_entry(struct vdo_waiter *waiter, void *context)
1089 {
1090 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1091 	struct recovery_journal_block *block = context;
1092 	struct recovery_journal *journal = block->journal;
1093 
1094 	/* Record the point at which we will make the journal entry. */
1095 	data_vio->recovery_journal_point = (struct journal_point) {
1096 		.sequence_number = block->sequence_number,
1097 		.entry_count = block->entry_count,
1098 	};
1099 
1100 	update_usages(journal, data_vio);
1101 	journal->available_space--;
1102 
1103 	if (!vdo_waitq_has_waiters(&block->entry_waiters))
1104 		journal->events.blocks.started++;
1105 
1106 	vdo_waitq_enqueue_waiter(&block->entry_waiters, &data_vio->waiter);
1107 	block->entry_count++;
1108 	block->uncommitted_entry_count++;
1109 	journal->events.entries.started++;
1110 
1111 	if (is_block_full(block)) {
1112 		/*
1113 		 * The block is full, so we can write it anytime henceforth. If it is already
1114 		 * committing, we'll queue it for writing when it comes back.
1115 		 */
1116 		schedule_block_write(journal, block);
1117 	}
1118 
1119 	/* Force out slab journal tail blocks when threshold is reached. */
1120 	check_slab_journal_commit_threshold(journal);
1121 }
1122 
assign_entries(struct recovery_journal * journal)1123 static void assign_entries(struct recovery_journal *journal)
1124 {
1125 	if (journal->adding_entries) {
1126 		/* Protect against re-entrancy. */
1127 		return;
1128 	}
1129 
1130 	journal->adding_entries = true;
1131 	while (vdo_waitq_has_waiters(&journal->entry_waiters) &&
1132 	       prepare_to_assign_entry(journal)) {
1133 		vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1134 					     assign_entry, journal->active_block);
1135 	}
1136 
1137 	/* Now that we've finished with entries, see if we have a batch of blocks to write. */
1138 	write_blocks(journal);
1139 	journal->adding_entries = false;
1140 }
1141 
1142 /**
1143  * recycle_journal_block() - Prepare an in-memory journal block to be reused now that it has been
1144  *                           fully committed.
1145  * @block: The block to be recycled.
1146  */
recycle_journal_block(struct recovery_journal_block * block)1147 static void recycle_journal_block(struct recovery_journal_block *block)
1148 {
1149 	struct recovery_journal *journal = block->journal;
1150 	block_count_t i;
1151 
1152 	list_move_tail(&block->list_node, &journal->free_tail_blocks);
1153 
1154 	/* Release any unused entry locks. */
1155 	for (i = block->entry_count; i < journal->entries_per_block; i++)
1156 		release_journal_block_reference(block);
1157 
1158 	/*
1159 	 * Release our own lock against reaping now that the block is completely committed, or
1160 	 * we're giving up because we're in read-only mode.
1161 	 */
1162 	if (block->entry_count > 0)
1163 		release_journal_block_reference(block);
1164 
1165 	if (block == journal->active_block)
1166 		journal->active_block = NULL;
1167 }
1168 
1169 /**
1170  * continue_committed_waiter() - invoked whenever a VIO is to be released from the journal because
1171  *                               its entry was committed to disk.
1172  * @waiter: The data_vio waiting on a journal write.
1173  * @context: A pointer to the recovery journal.
1174  *
1175  * Implements waiter_callback_fn.
1176  */
continue_committed_waiter(struct vdo_waiter * waiter,void * context)1177 static void continue_committed_waiter(struct vdo_waiter *waiter, void *context)
1178 {
1179 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1180 	struct recovery_journal *journal = context;
1181 	int result = (is_read_only(journal) ? VDO_READ_ONLY : VDO_SUCCESS);
1182 	bool has_decrement;
1183 
1184 	VDO_ASSERT_LOG_ONLY(vdo_before_journal_point(&journal->commit_point,
1185 						     &data_vio->recovery_journal_point),
1186 			    "DataVIOs released from recovery journal in order. Recovery journal point is (%llu, %u), but commit waiter point is (%llu, %u)",
1187 			    (unsigned long long) journal->commit_point.sequence_number,
1188 			    journal->commit_point.entry_count,
1189 			    (unsigned long long) data_vio->recovery_journal_point.sequence_number,
1190 			    data_vio->recovery_journal_point.entry_count);
1191 
1192 	journal->commit_point = data_vio->recovery_journal_point;
1193 	data_vio->last_async_operation = VIO_ASYNC_OP_UPDATE_REFERENCE_COUNTS;
1194 	if (result != VDO_SUCCESS) {
1195 		continue_data_vio_with_error(data_vio, result);
1196 		return;
1197 	}
1198 
1199 	/*
1200 	 * The increment must be launched first since it must come before the
1201 	 * decrement if they are in the same slab.
1202 	 */
1203 	has_decrement = (data_vio->decrement_updater.zpbn.pbn != VDO_ZERO_BLOCK);
1204 	if ((data_vio->increment_updater.zpbn.pbn != VDO_ZERO_BLOCK) || !has_decrement)
1205 		continue_data_vio(data_vio);
1206 
1207 	if (has_decrement)
1208 		vdo_launch_completion(&data_vio->decrement_completion);
1209 }
1210 
1211 /**
1212  * notify_commit_waiters() - Notify any VIOs whose entries have now committed.
1213  * @journal: The recovery journal to update.
1214  */
notify_commit_waiters(struct recovery_journal * journal)1215 static void notify_commit_waiters(struct recovery_journal *journal)
1216 {
1217 	struct recovery_journal_block *block;
1218 
1219 	list_for_each_entry(block, &journal->active_tail_blocks, list_node) {
1220 		if (block->committing)
1221 			return;
1222 
1223 		vdo_waitq_notify_all_waiters(&block->commit_waiters,
1224 					     continue_committed_waiter, journal);
1225 		if (is_read_only(journal)) {
1226 			vdo_waitq_notify_all_waiters(&block->entry_waiters,
1227 						     continue_committed_waiter,
1228 						     journal);
1229 		} else if (is_block_dirty(block) || !is_block_full(block)) {
1230 			/* Stop at partially-committed or partially-filled blocks. */
1231 			return;
1232 		}
1233 	}
1234 }
1235 
1236 /**
1237  * recycle_journal_blocks() - Recycle any journal blocks which have been fully committed.
1238  * @journal: The recovery journal to update.
1239  */
recycle_journal_blocks(struct recovery_journal * journal)1240 static void recycle_journal_blocks(struct recovery_journal *journal)
1241 {
1242 	struct recovery_journal_block *block, *tmp;
1243 
1244 	list_for_each_entry_safe(block, tmp, &journal->active_tail_blocks, list_node) {
1245 		if (block->committing) {
1246 			/* Don't recycle committing blocks. */
1247 			return;
1248 		}
1249 
1250 		if (!is_read_only(journal) &&
1251 		    (is_block_dirty(block) || !is_block_full(block))) {
1252 			/*
1253 			 * Don't recycle partially written or partially full blocks, except in
1254 			 * read-only mode.
1255 			 */
1256 			return;
1257 		}
1258 
1259 		recycle_journal_block(block);
1260 	}
1261 }
1262 
1263 /**
1264  * complete_write() - Handle post-commit processing.
1265  * @completion: The completion of the VIO writing this block.
1266  *
1267  * This is the callback registered by write_block(). If more entries accumulated in the block being
1268  * committed while the commit was in progress, another commit will be initiated.
1269  */
complete_write(struct vdo_completion * completion)1270 static void complete_write(struct vdo_completion *completion)
1271 {
1272 	struct recovery_journal_block *block = completion->parent;
1273 	struct recovery_journal *journal = block->journal;
1274 	struct recovery_journal_block *last_active_block;
1275 
1276 	assert_on_journal_thread(journal, __func__);
1277 
1278 	journal->pending_write_count -= 1;
1279 	journal->events.blocks.committed += 1;
1280 	journal->events.entries.committed += block->entries_in_commit;
1281 	block->uncommitted_entry_count -= block->entries_in_commit;
1282 	block->entries_in_commit = 0;
1283 	block->committing = false;
1284 
1285 	/* If this block is the latest block to be acknowledged, record that fact. */
1286 	if (block->sequence_number > journal->last_write_acknowledged)
1287 		journal->last_write_acknowledged = block->sequence_number;
1288 
1289 	last_active_block = get_journal_block(&journal->active_tail_blocks);
1290 	VDO_ASSERT_LOG_ONLY((block->sequence_number >= last_active_block->sequence_number),
1291 			    "completed journal write is still active");
1292 
1293 	notify_commit_waiters(journal);
1294 
1295 	/*
1296 	 * Is this block now full? Reaping, and adding entries, might have already sent it off for
1297 	 * rewriting; else, queue it for rewrite.
1298 	 */
1299 	if (is_block_dirty(block) && is_block_full(block))
1300 		schedule_block_write(journal, block);
1301 
1302 	recycle_journal_blocks(journal);
1303 	write_blocks(journal);
1304 
1305 	check_for_drain_complete(journal);
1306 }
1307 
handle_write_error(struct vdo_completion * completion)1308 static void handle_write_error(struct vdo_completion *completion)
1309 {
1310 	struct recovery_journal_block *block = completion->parent;
1311 	struct recovery_journal *journal = block->journal;
1312 
1313 	vio_record_metadata_io_error(as_vio(completion));
1314 	vdo_log_error_strerror(completion->result,
1315 			       "cannot write recovery journal block %llu",
1316 			       (unsigned long long) block->sequence_number);
1317 	enter_journal_read_only_mode(journal, completion->result);
1318 	complete_write(completion);
1319 }
1320 
complete_write_endio(struct bio * bio)1321 static void complete_write_endio(struct bio *bio)
1322 {
1323 	struct vio *vio = bio->bi_private;
1324 	struct recovery_journal_block *block = vio->completion.parent;
1325 	struct recovery_journal *journal = block->journal;
1326 
1327 	continue_vio_after_io(vio, complete_write, journal->thread_id);
1328 }
1329 
1330 /**
1331  * add_queued_recovery_entries() - Actually add entries from the queue to the given block.
1332  * @block: The journal block.
1333  */
add_queued_recovery_entries(struct recovery_journal_block * block)1334 static void add_queued_recovery_entries(struct recovery_journal_block *block)
1335 {
1336 	while (vdo_waitq_has_waiters(&block->entry_waiters)) {
1337 		struct data_vio *data_vio =
1338 			vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&block->entry_waiters));
1339 		struct tree_lock *lock = &data_vio->tree_lock;
1340 		struct packed_recovery_journal_entry *packed_entry;
1341 		struct recovery_journal_entry new_entry;
1342 
1343 		if (block->sector->entry_count == RECOVERY_JOURNAL_ENTRIES_PER_SECTOR)
1344 			set_active_sector(block,
1345 					  (char *) block->sector + VDO_SECTOR_SIZE);
1346 
1347 		/* Compose and encode the entry. */
1348 		packed_entry = &block->sector->entries[block->sector->entry_count++];
1349 		new_entry = (struct recovery_journal_entry) {
1350 			.mapping = {
1351 				.pbn = data_vio->increment_updater.zpbn.pbn,
1352 				.state = data_vio->increment_updater.zpbn.state,
1353 			},
1354 			.unmapping = {
1355 				.pbn = data_vio->decrement_updater.zpbn.pbn,
1356 				.state = data_vio->decrement_updater.zpbn.state,
1357 			},
1358 			.operation = data_vio->increment_updater.operation,
1359 			.slot = lock->tree_slots[lock->height].block_map_slot,
1360 		};
1361 		*packed_entry = vdo_pack_recovery_journal_entry(&new_entry);
1362 		data_vio->recovery_sequence_number = block->sequence_number;
1363 
1364 		/* Enqueue the data_vio to wait for its entry to commit. */
1365 		vdo_waitq_enqueue_waiter(&block->commit_waiters, &data_vio->waiter);
1366 	}
1367 }
1368 
1369 /**
1370  * write_block() - Issue a block for writing.
1371  * @waiter: The recovery journal block to write.
1372  * @context: Not used.
1373  *
1374  * Implements waiter_callback_fn.
1375  */
write_block(struct vdo_waiter * waiter,void __always_unused * context)1376 static void write_block(struct vdo_waiter *waiter, void __always_unused *context)
1377 {
1378 	struct recovery_journal_block *block =
1379 		container_of(waiter, struct recovery_journal_block, write_waiter);
1380 	struct recovery_journal *journal = block->journal;
1381 	struct packed_journal_header *header = get_block_header(block);
1382 
1383 	if (block->committing || !vdo_waitq_has_waiters(&block->entry_waiters) ||
1384 	    is_read_only(journal))
1385 		return;
1386 
1387 	block->entries_in_commit = vdo_waitq_num_waiters(&block->entry_waiters);
1388 	add_queued_recovery_entries(block);
1389 
1390 	journal->pending_write_count += 1;
1391 	journal->events.blocks.written += 1;
1392 	journal->events.entries.written += block->entries_in_commit;
1393 
1394 	header->block_map_head = __cpu_to_le64(journal->block_map_head);
1395 	header->slab_journal_head = __cpu_to_le64(journal->slab_journal_head);
1396 	header->entry_count = __cpu_to_le16(block->entry_count);
1397 
1398 	block->committing = true;
1399 
1400 	/*
1401 	 * We must issue a flush and a FUA for every commit. The flush is necessary to ensure that
1402 	 * the data being referenced is stable. The FUA is necessary to ensure that the journal
1403 	 * block itself is stable before allowing overwrites of the lbn's previous data.
1404 	 */
1405 	vdo_submit_metadata_vio(&block->vio, journal->origin + block->block_number,
1406 				complete_write_endio, handle_write_error,
1407 				REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH | REQ_SYNC | REQ_FUA);
1408 }
1409 
1410 
1411 /**
1412  * write_blocks() - Attempt to commit blocks, according to write policy.
1413  * @journal: The recovery journal.
1414  */
write_blocks(struct recovery_journal * journal)1415 static void write_blocks(struct recovery_journal *journal)
1416 {
1417 	assert_on_journal_thread(journal, __func__);
1418 	/*
1419 	 * We call this function after adding entries to the journal and after finishing a block
1420 	 * write. Thus, when this function terminates we must either have no VIOs waiting in the
1421 	 * journal or have some outstanding IO to provide a future wakeup.
1422 	 *
1423 	 * We want to only issue full blocks if there are no pending writes. However, if there are
1424 	 * no outstanding writes and some unwritten entries, we must issue a block, even if it's
1425 	 * the active block and it isn't full.
1426 	 */
1427 	if (journal->pending_write_count > 0)
1428 		return;
1429 
1430 	/* Write all the full blocks. */
1431 	vdo_waitq_notify_all_waiters(&journal->pending_writes, write_block, NULL);
1432 
1433 	/*
1434 	 * Do we need to write the active block? Only if we have no outstanding writes, even after
1435 	 * issuing all of the full writes.
1436 	 */
1437 	if ((journal->pending_write_count == 0) && (journal->active_block != NULL))
1438 		write_block(&journal->active_block->write_waiter, NULL);
1439 }
1440 
1441 /**
1442  * vdo_add_recovery_journal_entry() - Add an entry to a recovery journal.
1443  * @journal: The journal in which to make an entry.
1444  * @data_vio: The data_vio for which to add the entry. The entry will be taken
1445  *	      from the logical and new_mapped fields of the data_vio. The
1446  *	      data_vio's recovery_sequence_number field will be set to the
1447  *	      sequence number of the journal block in which the entry was
1448  *	      made.
1449  *
1450  * This method is asynchronous. The data_vio will not be called back until the entry is committed
1451  * to the on-disk journal.
1452  */
vdo_add_recovery_journal_entry(struct recovery_journal * journal,struct data_vio * data_vio)1453 void vdo_add_recovery_journal_entry(struct recovery_journal *journal,
1454 				    struct data_vio *data_vio)
1455 {
1456 	assert_on_journal_thread(journal, __func__);
1457 	if (!vdo_is_state_normal(&journal->state)) {
1458 		continue_data_vio_with_error(data_vio, VDO_INVALID_ADMIN_STATE);
1459 		return;
1460 	}
1461 
1462 	if (is_read_only(journal)) {
1463 		continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
1464 		return;
1465 	}
1466 
1467 	VDO_ASSERT_LOG_ONLY(data_vio->recovery_sequence_number == 0,
1468 			    "journal lock not held for new entry");
1469 
1470 	vdo_advance_journal_point(&journal->append_point, journal->entries_per_block);
1471 	vdo_waitq_enqueue_waiter(&journal->entry_waiters, &data_vio->waiter);
1472 	assign_entries(journal);
1473 }
1474 
1475 /**
1476  * is_lock_locked() - Check whether a lock is locked for a zone type.
1477  * @journal: The recovery journal.
1478  * @lock_number: The lock to check.
1479  * @zone_type: The type of the zone.
1480  *
1481  * If the recovery journal has a lock on the lock number, both logical and physical zones are
1482  * considered locked.
1483  *
1484  * Return: true if the specified lock has references (is locked).
1485  */
is_lock_locked(struct recovery_journal * journal,block_count_t lock_number,enum vdo_zone_type zone_type)1486 static bool is_lock_locked(struct recovery_journal *journal, block_count_t lock_number,
1487 			   enum vdo_zone_type zone_type)
1488 {
1489 	atomic_t *zone_count;
1490 	bool locked;
1491 
1492 	if (is_journal_zone_locked(journal, lock_number))
1493 		return true;
1494 
1495 	zone_count = get_zone_count_ptr(journal, lock_number, zone_type);
1496 	locked = (atomic_read(zone_count) != 0);
1497 	/* Pairs with implicit barrier in vdo_release_recovery_journal_block_reference() */
1498 	smp_rmb();
1499 	return locked;
1500 }
1501 
1502 /**
1503  * reap_recovery_journal() - Conduct a sweep on a recovery journal to reclaim unreferenced blocks.
1504  * @journal: The recovery journal.
1505  */
reap_recovery_journal(struct recovery_journal * journal)1506 static void reap_recovery_journal(struct recovery_journal *journal)
1507 {
1508 	if (journal->reaping) {
1509 		/*
1510 		 * We already have an outstanding reap in progress. We need to wait for it to
1511 		 * finish.
1512 		 */
1513 		return;
1514 	}
1515 
1516 	if (vdo_is_state_quiescent(&journal->state)) {
1517 		/* We are supposed to not do IO. Don't botch it by reaping. */
1518 		return;
1519 	}
1520 
1521 	/*
1522 	 * Start reclaiming blocks only when the journal head has no references. Then stop when a
1523 	 * block is referenced.
1524 	 */
1525 	while ((journal->block_map_reap_head < journal->last_write_acknowledged) &&
1526 		!is_lock_locked(journal, journal->block_map_head_block_number,
1527 				VDO_ZONE_TYPE_LOGICAL)) {
1528 		journal->block_map_reap_head++;
1529 		if (++journal->block_map_head_block_number == journal->size)
1530 			journal->block_map_head_block_number = 0;
1531 	}
1532 
1533 	while ((journal->slab_journal_reap_head < journal->last_write_acknowledged) &&
1534 		!is_lock_locked(journal, journal->slab_journal_head_block_number,
1535 				VDO_ZONE_TYPE_PHYSICAL)) {
1536 		journal->slab_journal_reap_head++;
1537 		if (++journal->slab_journal_head_block_number == journal->size)
1538 			journal->slab_journal_head_block_number = 0;
1539 	}
1540 
1541 	if ((journal->block_map_reap_head == journal->block_map_head) &&
1542 	    (journal->slab_journal_reap_head == journal->slab_journal_head)) {
1543 		/* Nothing happened. */
1544 		return;
1545 	}
1546 
1547 	/*
1548 	 * If the block map head will advance, we must flush any block map page modified by the
1549 	 * entries we are reaping. If the slab journal head will advance, we must flush the slab
1550 	 * summary update covering the slab journal that just released some lock.
1551 	 */
1552 	journal->reaping = true;
1553 	vdo_submit_flush_vio(journal->flush_vio, flush_endio, handle_flush_error);
1554 }
1555 
1556 /**
1557  * vdo_acquire_recovery_journal_block_reference() - Acquire a reference to a recovery journal block
1558  *                                                  from somewhere other than the journal itself.
1559  * @journal: The recovery journal.
1560  * @sequence_number: The journal sequence number of the referenced block.
1561  * @zone_type: The type of the zone making the adjustment.
1562  * @zone_id: The ID of the zone making the adjustment.
1563  */
vdo_acquire_recovery_journal_block_reference(struct recovery_journal * journal,sequence_number_t sequence_number,enum vdo_zone_type zone_type,zone_count_t zone_id)1564 void vdo_acquire_recovery_journal_block_reference(struct recovery_journal *journal,
1565 						  sequence_number_t sequence_number,
1566 						  enum vdo_zone_type zone_type,
1567 						  zone_count_t zone_id)
1568 {
1569 	block_count_t lock_number;
1570 	u16 *current_value;
1571 
1572 	if (sequence_number == 0)
1573 		return;
1574 
1575 	VDO_ASSERT_LOG_ONLY((zone_type != VDO_ZONE_TYPE_JOURNAL),
1576 			    "invalid lock count increment from journal zone");
1577 
1578 	lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
1579 	current_value = get_counter(journal, lock_number, zone_type, zone_id);
1580 	VDO_ASSERT_LOG_ONLY(*current_value < U16_MAX,
1581 			    "increment of lock counter must not overflow");
1582 
1583 	if (*current_value == 0) {
1584 		/*
1585 		 * This zone is acquiring this lock for the first time. Extra barriers because this
1586 		 * was original developed using an atomic add operation that implicitly had them.
1587 		 */
1588 		smp_mb__before_atomic();
1589 		atomic_inc(get_zone_count_ptr(journal, lock_number, zone_type));
1590 		/* same as before_atomic */
1591 		smp_mb__after_atomic();
1592 	}
1593 
1594 	*current_value += 1;
1595 }
1596 
1597 /**
1598  * vdo_release_journal_entry_lock() - Release a single per-entry reference count for a recovery
1599  *                                    journal block.
1600  * @journal: The recovery journal.
1601  * @sequence_number: The journal sequence number of the referenced block.
1602  */
vdo_release_journal_entry_lock(struct recovery_journal * journal,sequence_number_t sequence_number)1603 void vdo_release_journal_entry_lock(struct recovery_journal *journal,
1604 				    sequence_number_t sequence_number)
1605 {
1606 	block_count_t lock_number;
1607 
1608 	if (sequence_number == 0)
1609 		return;
1610 
1611 	lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
1612 	/*
1613 	 * Extra barriers because this was originally developed using an atomic add operation that
1614 	 * implicitly had them.
1615 	 */
1616 	smp_mb__before_atomic();
1617 	atomic_inc(get_decrement_counter(journal, lock_number));
1618 	/* same as before_atomic */
1619 	smp_mb__after_atomic();
1620 }
1621 
1622 /** Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)1623 static void initiate_drain(struct admin_state *state)
1624 {
1625 	check_for_drain_complete(container_of(state, struct recovery_journal, state));
1626 }
1627 
1628 /**
1629  * vdo_drain_recovery_journal() - Drain recovery journal I/O.
1630  * @journal: The journal to drain.
1631  * @operation: The drain operation (suspend or save).
1632  * @parent: The completion to notify once the journal is drained.
1633  *
1634  * All uncommitted entries will be written out.
1635  */
vdo_drain_recovery_journal(struct recovery_journal * journal,const struct admin_state_code * operation,struct vdo_completion * parent)1636 void vdo_drain_recovery_journal(struct recovery_journal *journal,
1637 				const struct admin_state_code *operation,
1638 				struct vdo_completion *parent)
1639 {
1640 	assert_on_journal_thread(journal, __func__);
1641 	vdo_start_draining(&journal->state, operation, parent, initiate_drain);
1642 }
1643 
1644 /**
1645  * resume_lock_counter() - Re-allow notifications from a suspended lock counter.
1646  * @counter: The counter.
1647  *
1648  * Return: true if the lock counter was suspended.
1649  */
resume_lock_counter(struct lock_counter * counter)1650 static bool resume_lock_counter(struct lock_counter *counter)
1651 {
1652 	int prior_state;
1653 
1654 	/*
1655 	 * Extra barriers because this was original developed using a CAS operation that implicitly
1656 	 * had them.
1657 	 */
1658 	smp_mb__before_atomic();
1659 	prior_state = atomic_cmpxchg(&counter->state, LOCK_COUNTER_STATE_SUSPENDED,
1660 				     LOCK_COUNTER_STATE_NOT_NOTIFYING);
1661 	/* same as before_atomic */
1662 	smp_mb__after_atomic();
1663 
1664 	return (prior_state == LOCK_COUNTER_STATE_SUSPENDED);
1665 }
1666 
1667 /**
1668  * vdo_resume_recovery_journal() - Resume a recovery journal which has been drained.
1669  * @journal: The journal to resume.
1670  * @parent: The completion to finish once the journal is resumed.
1671  */
vdo_resume_recovery_journal(struct recovery_journal * journal,struct vdo_completion * parent)1672 void vdo_resume_recovery_journal(struct recovery_journal *journal,
1673 				 struct vdo_completion *parent)
1674 {
1675 	bool saved;
1676 
1677 	assert_on_journal_thread(journal, __func__);
1678 	saved = vdo_is_state_saved(&journal->state);
1679 	vdo_set_completion_result(parent, vdo_resume_if_quiescent(&journal->state));
1680 	if (is_read_only(journal)) {
1681 		vdo_continue_completion(parent, VDO_READ_ONLY);
1682 		return;
1683 	}
1684 
1685 	if (saved)
1686 		initialize_journal_state(journal);
1687 
1688 	if (resume_lock_counter(&journal->lock_counter)) {
1689 		/* We might have missed a notification. */
1690 		reap_recovery_journal(journal);
1691 	}
1692 
1693 	vdo_launch_completion(parent);
1694 }
1695 
1696 /**
1697  * vdo_get_recovery_journal_logical_blocks_used() - Get the number of logical blocks in use by the
1698  *                                                  VDO.
1699  * @journal: The journal.
1700  *
1701  * Return: The number of logical blocks in use by the VDO.
1702  */
vdo_get_recovery_journal_logical_blocks_used(const struct recovery_journal * journal)1703 block_count_t vdo_get_recovery_journal_logical_blocks_used(const struct recovery_journal *journal)
1704 {
1705 	return journal->logical_blocks_used;
1706 }
1707 
1708 /**
1709  * vdo_get_recovery_journal_statistics() - Get the current statistics from the recovery journal.
1710  * @journal: The recovery journal to query.
1711  *
1712  * Return: A copy of the current statistics for the journal.
1713  */
1714 struct recovery_journal_statistics
vdo_get_recovery_journal_statistics(const struct recovery_journal * journal)1715 vdo_get_recovery_journal_statistics(const struct recovery_journal *journal)
1716 {
1717 	return journal->events;
1718 }
1719 
1720 /**
1721  * dump_recovery_block() - Dump the contents of the recovery block to the log.
1722  * @block: The block to dump.
1723  */
dump_recovery_block(const struct recovery_journal_block * block)1724 static void dump_recovery_block(const struct recovery_journal_block *block)
1725 {
1726 	vdo_log_info("    sequence number %llu; entries %u; %s; %zu entry waiters; %zu commit waiters",
1727 		     (unsigned long long) block->sequence_number, block->entry_count,
1728 		     (block->committing ? "committing" : "waiting"),
1729 		     vdo_waitq_num_waiters(&block->entry_waiters),
1730 		     vdo_waitq_num_waiters(&block->commit_waiters));
1731 }
1732 
1733 /**
1734  * vdo_dump_recovery_journal_statistics() - Dump some current statistics and other debug info from
1735  *                                          the recovery journal.
1736  * @journal: The recovery journal to dump.
1737  */
vdo_dump_recovery_journal_statistics(const struct recovery_journal * journal)1738 void vdo_dump_recovery_journal_statistics(const struct recovery_journal *journal)
1739 {
1740 	const struct recovery_journal_block *block;
1741 	struct recovery_journal_statistics stats = vdo_get_recovery_journal_statistics(journal);
1742 
1743 	vdo_log_info("Recovery Journal");
1744 	vdo_log_info("	block_map_head=%llu slab_journal_head=%llu last_write_acknowledged=%llu tail=%llu block_map_reap_head=%llu slab_journal_reap_head=%llu disk_full=%llu slab_journal_commits_requested=%llu entry_waiters=%zu",
1745 		     (unsigned long long) journal->block_map_head,
1746 		     (unsigned long long) journal->slab_journal_head,
1747 		     (unsigned long long) journal->last_write_acknowledged,
1748 		     (unsigned long long) journal->tail,
1749 		     (unsigned long long) journal->block_map_reap_head,
1750 		     (unsigned long long) journal->slab_journal_reap_head,
1751 		     (unsigned long long) stats.disk_full,
1752 		     (unsigned long long) stats.slab_journal_commits_requested,
1753 		     vdo_waitq_num_waiters(&journal->entry_waiters));
1754 	vdo_log_info("	entries: started=%llu written=%llu committed=%llu",
1755 		     (unsigned long long) stats.entries.started,
1756 		     (unsigned long long) stats.entries.written,
1757 		     (unsigned long long) stats.entries.committed);
1758 	vdo_log_info("	blocks: started=%llu written=%llu committed=%llu",
1759 		     (unsigned long long) stats.blocks.started,
1760 		     (unsigned long long) stats.blocks.written,
1761 		     (unsigned long long) stats.blocks.committed);
1762 
1763 	vdo_log_info("	active blocks:");
1764 	list_for_each_entry(block, &journal->active_tail_blocks, list_node)
1765 		dump_recovery_block(block);
1766 }
1767