xref: /linux/drivers/md/dm-vdo/vdo.c (revision eb01fe7abbe2d0b38824d2a93fdb4cc3eaf2ccc1)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 /*
7  * This file contains the main entry points for normal operations on a vdo as well as functions for
8  * constructing and destroying vdo instances (in memory).
9  */
10 
11 /**
12  * DOC:
13  *
14  * A read_only_notifier has a single completion which is used to perform read-only notifications,
15  * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected
16  * by a spinlock, are used to control the read-only mode entry process. The first field holds the
17  * read-only error. The second is the state field, which may hold any of the four special values
18  * enumerated here.
19  *
20  * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field
21  * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already
22  * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in
23  * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is
24  * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then
25  * notifications are currently disallowed, generally due to the vdo being suspended. In this case,
26  * the nothing more will be done until the vdo is resumed, at which point the notification will be
27  * performed. In any other case, the vdo is already read-only, and there is nothing more to do.
28  */
29 
30 #include "vdo.h"
31 
32 #include <linux/completion.h>
33 #include <linux/device-mapper.h>
34 #include <linux/kernel.h>
35 #include <linux/lz4.h>
36 #include <linux/module.h>
37 #include <linux/mutex.h>
38 #include <linux/spinlock.h>
39 #include <linux/types.h>
40 
41 #include "logger.h"
42 #include "memory-alloc.h"
43 #include "permassert.h"
44 #include "string-utils.h"
45 
46 #include "block-map.h"
47 #include "completion.h"
48 #include "data-vio.h"
49 #include "dedupe.h"
50 #include "encodings.h"
51 #include "funnel-workqueue.h"
52 #include "io-submitter.h"
53 #include "logical-zone.h"
54 #include "packer.h"
55 #include "physical-zone.h"
56 #include "recovery-journal.h"
57 #include "slab-depot.h"
58 #include "statistics.h"
59 #include "status-codes.h"
60 #include "vio.h"
61 
62 #define PARANOID_THREAD_CONSISTENCY_CHECKS 0
63 
64 struct sync_completion {
65 	struct vdo_completion vdo_completion;
66 	struct completion completion;
67 };
68 
69 /* A linked list is adequate for the small number of entries we expect. */
70 struct device_registry {
71 	struct list_head links;
72 	/* TODO: Convert to rcu per kernel recommendation. */
73 	rwlock_t lock;
74 };
75 
76 static struct device_registry registry;
77 
78 /**
79  * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device
80  *                                         registry.
81  */
82 void vdo_initialize_device_registry_once(void)
83 {
84 	INIT_LIST_HEAD(&registry.links);
85 	rwlock_init(&registry.lock);
86 }
87 
88 /** vdo_is_equal() - Implements vdo_filter_fn. */
89 static bool vdo_is_equal(struct vdo *vdo, const void *context)
90 {
91 	return (vdo == context);
92 }
93 
94 /**
95  * filter_vdos_locked() - Find a vdo in the registry if it exists there.
96  * @filter: The filter function to apply to devices.
97  * @context: A bit of context to provide the filter.
98  *
99  * Context: Must be called holding the lock.
100  *
101  * Return: the vdo object found, if any.
102  */
103 static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter,
104 						    const void *context)
105 {
106 	struct vdo *vdo;
107 
108 	list_for_each_entry(vdo, &registry.links, registration) {
109 		if (filter(vdo, context))
110 			return vdo;
111 	}
112 
113 	return NULL;
114 }
115 
116 /**
117  * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function.
118  * @filter: The filter function to apply to vdos.
119  * @context: A bit of context to provide the filter.
120  */
121 struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context)
122 {
123 	struct vdo *vdo;
124 
125 	read_lock(&registry.lock);
126 	vdo = filter_vdos_locked(filter, context);
127 	read_unlock(&registry.lock);
128 
129 	return vdo;
130 }
131 
132 static void start_vdo_request_queue(void *ptr)
133 {
134 	struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
135 
136 	vdo_register_allocating_thread(&thread->allocating_thread,
137 				       &thread->vdo->allocations_allowed);
138 }
139 
140 static void finish_vdo_request_queue(void *ptr)
141 {
142 	vdo_unregister_allocating_thread();
143 }
144 
145 #ifdef MODULE
146 #define MODULE_NAME THIS_MODULE->name
147 #else
148 #define MODULE_NAME "dm-vdo"
149 #endif  /* MODULE */
150 
151 static const struct vdo_work_queue_type default_queue_type = {
152 	.start = start_vdo_request_queue,
153 	.finish = finish_vdo_request_queue,
154 	.max_priority = VDO_DEFAULT_Q_MAX_PRIORITY,
155 	.default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY,
156 };
157 
158 static const struct vdo_work_queue_type bio_ack_q_type = {
159 	.start = NULL,
160 	.finish = NULL,
161 	.max_priority = BIO_ACK_Q_MAX_PRIORITY,
162 	.default_priority = BIO_ACK_Q_ACK_PRIORITY,
163 };
164 
165 static const struct vdo_work_queue_type cpu_q_type = {
166 	.start = NULL,
167 	.finish = NULL,
168 	.max_priority = CPU_Q_MAX_PRIORITY,
169 	.default_priority = CPU_Q_MAX_PRIORITY,
170 };
171 
172 static void uninitialize_thread_config(struct thread_config *config)
173 {
174 	vdo_free(vdo_forget(config->logical_threads));
175 	vdo_free(vdo_forget(config->physical_threads));
176 	vdo_free(vdo_forget(config->hash_zone_threads));
177 	vdo_free(vdo_forget(config->bio_threads));
178 	memset(config, 0, sizeof(struct thread_config));
179 }
180 
181 static void assign_thread_ids(struct thread_config *config,
182 			      thread_id_t thread_ids[], zone_count_t count)
183 {
184 	zone_count_t zone;
185 
186 	for (zone = 0; zone < count; zone++)
187 		thread_ids[zone] = config->thread_count++;
188 }
189 
190 /**
191  * initialize_thread_config() - Initialize the thread mapping
192  *
193  * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all
194  * three plus the packer and recovery journal. Otherwise, there must be at least one of each type,
195  * and each will have its own thread, as will the packer and recovery journal.
196  *
197  * Return: VDO_SUCCESS or an error.
198  */
199 static int __must_check initialize_thread_config(struct thread_count_config counts,
200 						 struct thread_config *config)
201 {
202 	int result;
203 	bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0);
204 
205 	config->bio_thread_count = counts.bio_threads;
206 	if (single) {
207 		config->logical_zone_count = 1;
208 		config->physical_zone_count = 1;
209 		config->hash_zone_count = 1;
210 	} else {
211 		config->logical_zone_count = counts.logical_zones;
212 		config->physical_zone_count = counts.physical_zones;
213 		config->hash_zone_count = counts.hash_zones;
214 	}
215 
216 	result = vdo_allocate(config->logical_zone_count, thread_id_t,
217 			      "logical thread array", &config->logical_threads);
218 	if (result != VDO_SUCCESS) {
219 		uninitialize_thread_config(config);
220 		return result;
221 	}
222 
223 	result = vdo_allocate(config->physical_zone_count, thread_id_t,
224 			      "physical thread array", &config->physical_threads);
225 	if (result != VDO_SUCCESS) {
226 		uninitialize_thread_config(config);
227 		return result;
228 	}
229 
230 	result = vdo_allocate(config->hash_zone_count, thread_id_t,
231 			      "hash thread array", &config->hash_zone_threads);
232 	if (result != VDO_SUCCESS) {
233 		uninitialize_thread_config(config);
234 		return result;
235 	}
236 
237 	result = vdo_allocate(config->bio_thread_count, thread_id_t,
238 			      "bio thread array", &config->bio_threads);
239 	if (result != VDO_SUCCESS) {
240 		uninitialize_thread_config(config);
241 		return result;
242 	}
243 
244 	if (single) {
245 		config->logical_threads[0] = config->thread_count;
246 		config->physical_threads[0] = config->thread_count;
247 		config->hash_zone_threads[0] = config->thread_count++;
248 	} else {
249 		config->admin_thread = config->thread_count;
250 		config->journal_thread = config->thread_count++;
251 		config->packer_thread = config->thread_count++;
252 		assign_thread_ids(config, config->logical_threads, counts.logical_zones);
253 		assign_thread_ids(config, config->physical_threads, counts.physical_zones);
254 		assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones);
255 	}
256 
257 	config->dedupe_thread = config->thread_count++;
258 	config->bio_ack_thread =
259 		((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID);
260 	config->cpu_thread = config->thread_count++;
261 	assign_thread_ids(config, config->bio_threads, counts.bio_threads);
262 	return VDO_SUCCESS;
263 }
264 
265 /**
266  * read_geometry_block() - Synchronously read the geometry block from a vdo's underlying block
267  *                         device.
268  * @vdo: The vdo whose geometry is to be read.
269  *
270  * Return: VDO_SUCCESS or an error code.
271  */
272 static int __must_check read_geometry_block(struct vdo *vdo)
273 {
274 	struct vio *vio;
275 	char *block;
276 	int result;
277 
278 	result = vdo_allocate(VDO_BLOCK_SIZE, u8, __func__, &block);
279 	if (result != VDO_SUCCESS)
280 		return result;
281 
282 	result = create_metadata_vio(vdo, VIO_TYPE_GEOMETRY, VIO_PRIORITY_HIGH, NULL,
283 				     block, &vio);
284 	if (result != VDO_SUCCESS) {
285 		vdo_free(block);
286 		return result;
287 	}
288 
289 	/*
290 	 * This is only safe because, having not already loaded the geometry, the vdo's geometry's
291 	 * bio_offset field is 0, so the fact that vio_reset_bio() will subtract that offset from
292 	 * the supplied pbn is not a problem.
293 	 */
294 	result = vio_reset_bio(vio, block, NULL, REQ_OP_READ,
295 			       VDO_GEOMETRY_BLOCK_LOCATION);
296 	if (result != VDO_SUCCESS) {
297 		free_vio(vdo_forget(vio));
298 		vdo_free(block);
299 		return result;
300 	}
301 
302 	bio_set_dev(vio->bio, vdo_get_backing_device(vdo));
303 	submit_bio_wait(vio->bio);
304 	result = blk_status_to_errno(vio->bio->bi_status);
305 	free_vio(vdo_forget(vio));
306 	if (result != 0) {
307 		vdo_log_error_strerror(result, "synchronous read failed");
308 		vdo_free(block);
309 		return -EIO;
310 	}
311 
312 	result = vdo_parse_geometry_block((u8 *) block, &vdo->geometry);
313 	vdo_free(block);
314 	return result;
315 }
316 
317 static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count,
318 				 thread_id_t id, const char *prefix,
319 				 char *buffer, size_t buffer_length)
320 {
321 	if (id >= thread_ids[0]) {
322 		thread_id_t index = id - thread_ids[0];
323 
324 		if (index < count) {
325 			snprintf(buffer, buffer_length, "%s%d", prefix, index);
326 			return true;
327 		}
328 	}
329 
330 	return false;
331 }
332 
333 /**
334  * get_thread_name() - Format the name of the worker thread desired to support a given work queue.
335  * @thread_config: The thread configuration.
336  * @thread_id: The thread id.
337  * @buffer: Where to put the formatted name.
338  * @buffer_length: Size of the output buffer.
339  *
340  * The physical layer may add a prefix identifying the product; the output from this function
341  * should just identify the thread.
342  */
343 static void get_thread_name(const struct thread_config *thread_config,
344 			    thread_id_t thread_id, char *buffer, size_t buffer_length)
345 {
346 	if (thread_id == thread_config->journal_thread) {
347 		if (thread_config->packer_thread == thread_id) {
348 			/*
349 			 * This is the "single thread" config where one thread is used for the
350 			 * journal, packer, logical, physical, and hash zones. In that case, it is
351 			 * known as the "request queue."
352 			 */
353 			snprintf(buffer, buffer_length, "reqQ");
354 			return;
355 		}
356 
357 		snprintf(buffer, buffer_length, "journalQ");
358 		return;
359 	} else if (thread_id == thread_config->admin_thread) {
360 		/* Theoretically this could be different from the journal thread. */
361 		snprintf(buffer, buffer_length, "adminQ");
362 		return;
363 	} else if (thread_id == thread_config->packer_thread) {
364 		snprintf(buffer, buffer_length, "packerQ");
365 		return;
366 	} else if (thread_id == thread_config->dedupe_thread) {
367 		snprintf(buffer, buffer_length, "dedupeQ");
368 		return;
369 	} else if (thread_id == thread_config->bio_ack_thread) {
370 		snprintf(buffer, buffer_length, "ackQ");
371 		return;
372 	} else if (thread_id == thread_config->cpu_thread) {
373 		snprintf(buffer, buffer_length, "cpuQ");
374 		return;
375 	}
376 
377 	if (get_zone_thread_name(thread_config->logical_threads,
378 				 thread_config->logical_zone_count,
379 				 thread_id, "logQ", buffer, buffer_length))
380 		return;
381 
382 	if (get_zone_thread_name(thread_config->physical_threads,
383 				 thread_config->physical_zone_count,
384 				 thread_id, "physQ", buffer, buffer_length))
385 		return;
386 
387 	if (get_zone_thread_name(thread_config->hash_zone_threads,
388 				 thread_config->hash_zone_count,
389 				 thread_id, "hashQ", buffer, buffer_length))
390 		return;
391 
392 	if (get_zone_thread_name(thread_config->bio_threads,
393 				 thread_config->bio_thread_count,
394 				 thread_id, "bioQ", buffer, buffer_length))
395 		return;
396 
397 	/* Some sort of misconfiguration? */
398 	snprintf(buffer, buffer_length, "reqQ%d", thread_id);
399 }
400 
401 /**
402  * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for
403  *                     round-robin queues).
404  * @vdo: The vdo which owns the thread.
405  * @thread_id: The id of the thread to create (as determined by the thread_config).
406  * @type: The description of the work queue for this thread.
407  * @queue_count: The number of actual threads/queues contained in the "thread".
408  * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL.
409  *
410  * Each "thread" constructed by this method is represented by a unique thread id in the thread
411  * config, and completions can be enqueued to the queue and run on the threads comprising this
412  * entity.
413  *
414  * Return: VDO_SUCCESS or an error.
415  */
416 int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id,
417 		    const struct vdo_work_queue_type *type,
418 		    unsigned int queue_count, void *contexts[])
419 {
420 	struct vdo_thread *thread = &vdo->threads[thread_id];
421 	char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN];
422 
423 	if (type == NULL)
424 		type = &default_queue_type;
425 
426 	if (thread->queue != NULL) {
427 		return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type),
428 				  "already constructed vdo thread %u is of the correct type",
429 				  thread_id);
430 	}
431 
432 	thread->vdo = vdo;
433 	thread->thread_id = thread_id;
434 	get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name));
435 	return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread,
436 				   type, queue_count, contexts, &thread->queue);
437 }
438 
439 /**
440  * register_vdo() - Register a VDO; it must not already be registered.
441  * @vdo: The vdo to register.
442  *
443  * Return: VDO_SUCCESS or an error.
444  */
445 static int register_vdo(struct vdo *vdo)
446 {
447 	int result;
448 
449 	write_lock(&registry.lock);
450 	result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL,
451 			    "VDO not already registered");
452 	if (result == VDO_SUCCESS) {
453 		INIT_LIST_HEAD(&vdo->registration);
454 		list_add_tail(&vdo->registration, &registry.links);
455 	}
456 	write_unlock(&registry.lock);
457 
458 	return result;
459 }
460 
461 /**
462  * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on
463  *                    error.
464  * @vdo: The vdo being initialized
465  * @config: The configuration of the vdo
466  * @instance: The instance number of the vdo
467  * @reason: The buffer to hold the failure reason on error
468  */
469 static int initialize_vdo(struct vdo *vdo, struct device_config *config,
470 			  unsigned int instance, char **reason)
471 {
472 	int result;
473 	zone_count_t i;
474 
475 	vdo->device_config = config;
476 	vdo->starting_sector_offset = config->owning_target->begin;
477 	vdo->instance = instance;
478 	vdo->allocations_allowed = true;
479 	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW);
480 	INIT_LIST_HEAD(&vdo->device_config_list);
481 	vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION);
482 	init_completion(&vdo->admin.callback_sync);
483 	mutex_init(&vdo->stats_mutex);
484 	result = read_geometry_block(vdo);
485 	if (result != VDO_SUCCESS) {
486 		*reason = "Could not load geometry block";
487 		return result;
488 	}
489 
490 	result = initialize_thread_config(config->thread_counts, &vdo->thread_config);
491 	if (result != VDO_SUCCESS) {
492 		*reason = "Cannot create thread configuration";
493 		return result;
494 	}
495 
496 	vdo_log_info("zones: %d logical, %d physical, %d hash; total threads: %d",
497 		     config->thread_counts.logical_zones,
498 		     config->thread_counts.physical_zones,
499 		     config->thread_counts.hash_zones, vdo->thread_config.thread_count);
500 
501 	/* Compression context storage */
502 	result = vdo_allocate(config->thread_counts.cpu_threads, char *, "LZ4 context",
503 			      &vdo->compression_context);
504 	if (result != VDO_SUCCESS) {
505 		*reason = "cannot allocate LZ4 context";
506 		return result;
507 	}
508 
509 	for (i = 0; i < config->thread_counts.cpu_threads; i++) {
510 		result = vdo_allocate(LZ4_MEM_COMPRESS, char, "LZ4 context",
511 				      &vdo->compression_context[i]);
512 		if (result != VDO_SUCCESS) {
513 			*reason = "cannot allocate LZ4 context";
514 			return result;
515 		}
516 	}
517 
518 	result = register_vdo(vdo);
519 	if (result != VDO_SUCCESS) {
520 		*reason = "Cannot add VDO to device registry";
521 		return result;
522 	}
523 
524 	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED);
525 	return result;
526 }
527 
528 /**
529  * vdo_make() - Allocate and initialize a vdo.
530  * @instance: Device instantiation counter.
531  * @config: The device configuration.
532  * @reason: The reason for any failure during this call.
533  * @vdo_ptr: A pointer to hold the created vdo.
534  *
535  * Return: VDO_SUCCESS or an error.
536  */
537 int vdo_make(unsigned int instance, struct device_config *config, char **reason,
538 	     struct vdo **vdo_ptr)
539 {
540 	int result;
541 	struct vdo *vdo;
542 
543 	/* Initialize with a generic failure reason to prevent returning garbage. */
544 	*reason = "Unspecified error";
545 
546 	result = vdo_allocate(1, struct vdo, __func__, &vdo);
547 	if (result != VDO_SUCCESS) {
548 		*reason = "Cannot allocate VDO";
549 		return result;
550 	}
551 
552 	result = initialize_vdo(vdo, config, instance, reason);
553 	if (result != VDO_SUCCESS) {
554 		vdo_destroy(vdo);
555 		return result;
556 	}
557 
558 	/* From here on, the caller will clean up if there is an error. */
559 	*vdo_ptr = vdo;
560 
561 	snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix),
562 		 "%s%u", MODULE_NAME, instance);
563 	BUG_ON(vdo->thread_name_prefix[0] == '\0');
564 	result = vdo_allocate(vdo->thread_config.thread_count,
565 			      struct vdo_thread, __func__, &vdo->threads);
566 	if (result != VDO_SUCCESS) {
567 		*reason = "Cannot allocate thread structures";
568 		return result;
569 	}
570 
571 	result = vdo_make_thread(vdo, vdo->thread_config.admin_thread,
572 				 &default_queue_type, 1, NULL);
573 	if (result != VDO_SUCCESS) {
574 		*reason = "Cannot make admin thread";
575 		return result;
576 	}
577 
578 	result = vdo_make_flusher(vdo);
579 	if (result != VDO_SUCCESS) {
580 		*reason = "Cannot make flusher zones";
581 		return result;
582 	}
583 
584 	result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer);
585 	if (result != VDO_SUCCESS) {
586 		*reason = "Cannot make packer zones";
587 		return result;
588 	}
589 
590 	BUG_ON(vdo->device_config->logical_block_size <= 0);
591 	BUG_ON(vdo->device_config->owned_device == NULL);
592 	result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS,
593 				    MAXIMUM_VDO_USER_VIOS * 3 / 4,
594 				    &vdo->data_vio_pool);
595 	if (result != VDO_SUCCESS) {
596 		*reason = "Cannot allocate data_vio pool";
597 		return result;
598 	}
599 
600 	result = vdo_make_io_submitter(config->thread_counts.bio_threads,
601 				       config->thread_counts.bio_rotation_interval,
602 				       get_data_vio_pool_request_limit(vdo->data_vio_pool),
603 				       vdo, &vdo->io_submitter);
604 	if (result != VDO_SUCCESS) {
605 		*reason = "bio submission initialization failed";
606 		return result;
607 	}
608 
609 	if (vdo_uses_bio_ack_queue(vdo)) {
610 		result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread,
611 					 &bio_ack_q_type,
612 					 config->thread_counts.bio_ack_threads, NULL);
613 		if (result != VDO_SUCCESS) {
614 			*reason = "bio ack queue initialization failed";
615 			return result;
616 		}
617 	}
618 
619 	result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type,
620 				 config->thread_counts.cpu_threads,
621 				 (void **) vdo->compression_context);
622 	if (result != VDO_SUCCESS) {
623 		*reason = "CPU queue initialization failed";
624 		return result;
625 	}
626 
627 	return VDO_SUCCESS;
628 }
629 
630 static void finish_vdo(struct vdo *vdo)
631 {
632 	int i;
633 
634 	if (vdo->threads == NULL)
635 		return;
636 
637 	vdo_cleanup_io_submitter(vdo->io_submitter);
638 	vdo_finish_dedupe_index(vdo->hash_zones);
639 
640 	for (i = 0; i < vdo->thread_config.thread_count; i++)
641 		vdo_finish_work_queue(vdo->threads[i].queue);
642 }
643 
644 /**
645  * free_listeners() - Free the list of read-only listeners associated with a thread.
646  * @thread_data: The thread holding the list to free.
647  */
648 static void free_listeners(struct vdo_thread *thread)
649 {
650 	struct read_only_listener *listener, *next;
651 
652 	for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) {
653 		next = vdo_forget(listener->next);
654 		vdo_free(listener);
655 	}
656 }
657 
658 static void uninitialize_super_block(struct vdo_super_block *super_block)
659 {
660 	free_vio_components(&super_block->vio);
661 	vdo_free(super_block->buffer);
662 }
663 
664 /**
665  * unregister_vdo() - Remove a vdo from the device registry.
666  * @vdo: The vdo to remove.
667  */
668 static void unregister_vdo(struct vdo *vdo)
669 {
670 	write_lock(&registry.lock);
671 	if (filter_vdos_locked(vdo_is_equal, vdo) == vdo)
672 		list_del_init(&vdo->registration);
673 
674 	write_unlock(&registry.lock);
675 }
676 
677 /**
678  * vdo_destroy() - Destroy a vdo instance.
679  * @vdo: The vdo to destroy (may be NULL).
680  */
681 void vdo_destroy(struct vdo *vdo)
682 {
683 	unsigned int i;
684 
685 	if (vdo == NULL)
686 		return;
687 
688 	/* A running VDO should never be destroyed without suspending first. */
689 	BUG_ON(vdo_get_admin_state(vdo)->normal);
690 
691 	vdo->allocations_allowed = true;
692 
693 	finish_vdo(vdo);
694 	unregister_vdo(vdo);
695 	free_data_vio_pool(vdo->data_vio_pool);
696 	vdo_free_io_submitter(vdo_forget(vdo->io_submitter));
697 	vdo_free_flusher(vdo_forget(vdo->flusher));
698 	vdo_free_packer(vdo_forget(vdo->packer));
699 	vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal));
700 	vdo_free_slab_depot(vdo_forget(vdo->depot));
701 	vdo_uninitialize_layout(&vdo->layout);
702 	vdo_uninitialize_layout(&vdo->next_layout);
703 	if (vdo->partition_copier)
704 		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
705 	uninitialize_super_block(&vdo->super_block);
706 	vdo_free_block_map(vdo_forget(vdo->block_map));
707 	vdo_free_hash_zones(vdo_forget(vdo->hash_zones));
708 	vdo_free_physical_zones(vdo_forget(vdo->physical_zones));
709 	vdo_free_logical_zones(vdo_forget(vdo->logical_zones));
710 
711 	if (vdo->threads != NULL) {
712 		for (i = 0; i < vdo->thread_config.thread_count; i++) {
713 			free_listeners(&vdo->threads[i]);
714 			vdo_free_work_queue(vdo_forget(vdo->threads[i].queue));
715 		}
716 		vdo_free(vdo_forget(vdo->threads));
717 	}
718 
719 	uninitialize_thread_config(&vdo->thread_config);
720 
721 	if (vdo->compression_context != NULL) {
722 		for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++)
723 			vdo_free(vdo_forget(vdo->compression_context[i]));
724 
725 		vdo_free(vdo_forget(vdo->compression_context));
726 	}
727 	vdo_free(vdo);
728 }
729 
730 static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block)
731 {
732 	int result;
733 
734 	result = vdo_allocate(VDO_BLOCK_SIZE, char, "encoded super block",
735 			      (char **) &vdo->super_block.buffer);
736 	if (result != VDO_SUCCESS)
737 		return result;
738 
739 	return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK,
740 				       VIO_PRIORITY_METADATA, NULL, 1,
741 				       (char *) super_block->buffer,
742 				       &vdo->super_block.vio);
743 }
744 
745 /**
746  * finish_reading_super_block() - Continue after loading the super block.
747  * @completion: The super block vio.
748  *
749  * This callback is registered in vdo_load_super_block().
750  */
751 static void finish_reading_super_block(struct vdo_completion *completion)
752 {
753 	struct vdo_super_block *super_block =
754 		container_of(as_vio(completion), struct vdo_super_block, vio);
755 
756 	vdo_continue_completion(vdo_forget(completion->parent),
757 				vdo_decode_super_block(super_block->buffer));
758 }
759 
760 /**
761  * handle_super_block_read_error() - Handle an error reading the super block.
762  * @completion: The super block vio.
763  *
764  * This error handler is registered in vdo_load_super_block().
765  */
766 static void handle_super_block_read_error(struct vdo_completion *completion)
767 {
768 	vio_record_metadata_io_error(as_vio(completion));
769 	finish_reading_super_block(completion);
770 }
771 
772 static void read_super_block_endio(struct bio *bio)
773 {
774 	struct vio *vio = bio->bi_private;
775 	struct vdo_completion *parent = vio->completion.parent;
776 
777 	continue_vio_after_io(vio, finish_reading_super_block,
778 			      parent->callback_thread_id);
779 }
780 
781 /**
782  * vdo_load_super_block() - Allocate a super block and read its contents from storage.
783  * @vdo: The vdo containing the super block on disk.
784  * @parent: The completion to notify after loading the super block.
785  */
786 void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent)
787 {
788 	int result;
789 
790 	result = initialize_super_block(vdo, &vdo->super_block);
791 	if (result != VDO_SUCCESS) {
792 		vdo_continue_completion(parent, result);
793 		return;
794 	}
795 
796 	vdo->super_block.vio.completion.parent = parent;
797 	vdo_submit_metadata_vio(&vdo->super_block.vio,
798 				vdo_get_data_region_start(vdo->geometry),
799 				read_super_block_endio,
800 				handle_super_block_read_error,
801 				REQ_OP_READ);
802 }
803 
804 /**
805  * vdo_get_backing_device() - Get the block device object underlying a vdo.
806  * @vdo: The vdo.
807  *
808  * Return: The vdo's current block device.
809  */
810 struct block_device *vdo_get_backing_device(const struct vdo *vdo)
811 {
812 	return vdo->device_config->owned_device->bdev;
813 }
814 
815 /**
816  * vdo_get_device_name() - Get the device name associated with the vdo target.
817  * @target: The target device interface.
818  *
819  * Return: The block device name.
820  */
821 const char *vdo_get_device_name(const struct dm_target *target)
822 {
823 	return dm_device_name(dm_table_get_md(target->table));
824 }
825 
826 /**
827  * vdo_synchronous_flush() - Issue a flush request and wait for it to complete.
828  * @vdo: The vdo.
829  *
830  * Return: VDO_SUCCESS or an error.
831  */
832 int vdo_synchronous_flush(struct vdo *vdo)
833 {
834 	int result;
835 	struct bio bio;
836 
837 	bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0,
838 		 REQ_OP_WRITE | REQ_PREFLUSH);
839 	submit_bio_wait(&bio);
840 	result = blk_status_to_errno(bio.bi_status);
841 
842 	atomic64_inc(&vdo->stats.flush_out);
843 	if (result != 0) {
844 		vdo_log_error_strerror(result, "synchronous flush failed");
845 		result = -EIO;
846 	}
847 
848 	bio_uninit(&bio);
849 	return result;
850 }
851 
852 /**
853  * vdo_get_state() - Get the current state of the vdo.
854  * @vdo: The vdo.
855 
856  * Context: This method may be called from any thread.
857  *
858  * Return: The current state of the vdo.
859  */
860 enum vdo_state vdo_get_state(const struct vdo *vdo)
861 {
862 	enum vdo_state state = atomic_read(&vdo->state);
863 
864 	/* pairs with barriers where state field is changed */
865 	smp_rmb();
866 	return state;
867 }
868 
869 /**
870  * vdo_set_state() - Set the current state of the vdo.
871  * @vdo: The vdo whose state is to be set.
872  * @state: The new state of the vdo.
873  *
874  * Context: This method may be called from any thread.
875  */
876 void vdo_set_state(struct vdo *vdo, enum vdo_state state)
877 {
878 	/* pairs with barrier in vdo_get_state */
879 	smp_wmb();
880 	atomic_set(&vdo->state, state);
881 }
882 
883 /**
884  * vdo_get_admin_state() - Get the admin state of the vdo.
885  * @vdo: The vdo.
886  *
887  * Return: The code for the vdo's current admin state.
888  */
889 const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo)
890 {
891 	return vdo_get_admin_state_code(&vdo->admin.state);
892 }
893 
894 /**
895  * record_vdo() - Record the state of the VDO for encoding in the super block.
896  */
897 static void record_vdo(struct vdo *vdo)
898 {
899 	/* This is for backwards compatibility. */
900 	vdo->states.unused = vdo->geometry.unused;
901 	vdo->states.vdo.state = vdo_get_state(vdo);
902 	vdo->states.block_map = vdo_record_block_map(vdo->block_map);
903 	vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal);
904 	vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot);
905 	vdo->states.layout = vdo->layout;
906 }
907 
908 /**
909  * continue_super_block_parent() - Continue the parent of a super block save operation.
910  * @completion: The super block vio.
911  *
912  * This callback is registered in vdo_save_components().
913  */
914 static void continue_super_block_parent(struct vdo_completion *completion)
915 {
916 	vdo_continue_completion(vdo_forget(completion->parent), completion->result);
917 }
918 
919 /**
920  * handle_save_error() - Log a super block save error.
921  * @completion: The super block vio.
922  *
923  * This error handler is registered in vdo_save_components().
924  */
925 static void handle_save_error(struct vdo_completion *completion)
926 {
927 	struct vdo_super_block *super_block =
928 		container_of(as_vio(completion), struct vdo_super_block, vio);
929 
930 	vio_record_metadata_io_error(&super_block->vio);
931 	vdo_log_error_strerror(completion->result, "super block save failed");
932 	/*
933 	 * Mark the super block as unwritable so that we won't attempt to write it again. This
934 	 * avoids the case where a growth attempt fails writing the super block with the new size,
935 	 * but the subsequent attempt to write out the read-only state succeeds. In this case,
936 	 * writes which happened just before the suspend would not be visible if the VDO is
937 	 * restarted without rebuilding, but, after a read-only rebuild, the effects of those
938 	 * writes would reappear.
939 	 */
940 	super_block->unwritable = true;
941 	completion->callback(completion);
942 }
943 
944 static void super_block_write_endio(struct bio *bio)
945 {
946 	struct vio *vio = bio->bi_private;
947 	struct vdo_completion *parent = vio->completion.parent;
948 
949 	continue_vio_after_io(vio, continue_super_block_parent,
950 			      parent->callback_thread_id);
951 }
952 
953 /**
954  * vdo_save_components() - Encode the vdo and save the super block asynchronously.
955  * @vdo: The vdo whose state is being saved.
956  * @parent: The completion to notify when the save is complete.
957  */
958 void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent)
959 {
960 	struct vdo_super_block *super_block = &vdo->super_block;
961 
962 	if (super_block->unwritable) {
963 		vdo_continue_completion(parent, VDO_READ_ONLY);
964 		return;
965 	}
966 
967 	if (super_block->vio.completion.parent != NULL) {
968 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
969 		return;
970 	}
971 
972 	record_vdo(vdo);
973 
974 	vdo_encode_super_block(super_block->buffer, &vdo->states);
975 	super_block->vio.completion.parent = parent;
976 	super_block->vio.completion.callback_thread_id = parent->callback_thread_id;
977 	vdo_submit_metadata_vio(&super_block->vio,
978 				vdo_get_data_region_start(vdo->geometry),
979 				super_block_write_endio, handle_save_error,
980 				REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA);
981 }
982 
983 /**
984  * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes
985  *                                     read-only.
986  * @vdo: The vdo to register with.
987  * @listener: The object to notify.
988  * @notification: The function to call to send the notification.
989  * @thread_id: The id of the thread on which to send the notification.
990  *
991  * Return: VDO_SUCCESS or an error.
992  */
993 int vdo_register_read_only_listener(struct vdo *vdo, void *listener,
994 				    vdo_read_only_notification_fn notification,
995 				    thread_id_t thread_id)
996 {
997 	struct vdo_thread *thread = &vdo->threads[thread_id];
998 	struct read_only_listener *read_only_listener;
999 	int result;
1000 
1001 	result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread,
1002 			    "read only listener not registered on dedupe thread");
1003 	if (result != VDO_SUCCESS)
1004 		return result;
1005 
1006 	result = vdo_allocate(1, struct read_only_listener, __func__,
1007 			      &read_only_listener);
1008 	if (result != VDO_SUCCESS)
1009 		return result;
1010 
1011 	*read_only_listener = (struct read_only_listener) {
1012 		.listener = listener,
1013 		.notify = notification,
1014 		.next = thread->listeners,
1015 	};
1016 
1017 	thread->listeners = read_only_listener;
1018 	return VDO_SUCCESS;
1019 }
1020 
1021 /**
1022  * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only.
1023  * @listener: The vdo.
1024  * @parent: The completion to notify in order to acknowledge the notification.
1025  *
1026  * This will save the read-only state to the super block.
1027  *
1028  * Implements vdo_read_only_notification_fn.
1029  */
1030 static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent)
1031 {
1032 	struct vdo *vdo = listener;
1033 
1034 	if (vdo_in_read_only_mode(vdo))
1035 		vdo_finish_completion(parent);
1036 
1037 	vdo_set_state(vdo, VDO_READ_ONLY_MODE);
1038 	vdo_save_components(vdo, parent);
1039 }
1040 
1041 /**
1042  * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors.
1043  * @vdo: The vdo to enable.
1044  *
1045  * Return: VDO_SUCCESS or an error.
1046  */
1047 int vdo_enable_read_only_entry(struct vdo *vdo)
1048 {
1049 	thread_id_t id;
1050 	bool is_read_only = vdo_in_read_only_mode(vdo);
1051 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1052 
1053 	if (is_read_only) {
1054 		notifier->read_only_error = VDO_READ_ONLY;
1055 		notifier->state = NOTIFIED;
1056 	} else {
1057 		notifier->state = MAY_NOT_NOTIFY;
1058 	}
1059 
1060 	spin_lock_init(&notifier->lock);
1061 	vdo_initialize_completion(&notifier->completion, vdo,
1062 				  VDO_READ_ONLY_MODE_COMPLETION);
1063 
1064 	for (id = 0; id < vdo->thread_config.thread_count; id++)
1065 		vdo->threads[id].is_read_only = is_read_only;
1066 
1067 	return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode,
1068 					       vdo->thread_config.admin_thread);
1069 }
1070 
1071 /**
1072  * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in
1073  *                                                progress and prevent any subsequent
1074  *                                                notifications.
1075  * @parent: The completion to notify when no threads are entering read-only mode.
1076  *
1077  * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry().
1078  */
1079 void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent)
1080 {
1081 	struct vdo *vdo = parent->vdo;
1082 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1083 
1084 	vdo_assert_on_admin_thread(vdo, __func__);
1085 
1086 	if (notifier->waiter != NULL) {
1087 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1088 		return;
1089 	}
1090 
1091 	spin_lock(&notifier->lock);
1092 	if (notifier->state == NOTIFYING)
1093 		notifier->waiter = parent;
1094 	else if (notifier->state == MAY_NOTIFY)
1095 		notifier->state = MAY_NOT_NOTIFY;
1096 	spin_unlock(&notifier->lock);
1097 
1098 	if (notifier->waiter == NULL) {
1099 		/*
1100 		 * A notification was not in progress, and now they are
1101 		 * disallowed.
1102 		 */
1103 		vdo_launch_completion(parent);
1104 		return;
1105 	}
1106 }
1107 
1108 /**
1109  * as_notifier() - Convert a generic vdo_completion to a read_only_notifier.
1110  * @completion: The completion to convert.
1111  *
1112  * Return: The completion as a read_only_notifier.
1113  */
1114 static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion)
1115 {
1116 	vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION);
1117 	return container_of(completion, struct read_only_notifier, completion);
1118 }
1119 
1120 /**
1121  * finish_entering_read_only_mode() - Complete the process of entering read only mode.
1122  * @completion: The read-only mode completion.
1123  */
1124 static void finish_entering_read_only_mode(struct vdo_completion *completion)
1125 {
1126 	struct read_only_notifier *notifier = as_notifier(completion);
1127 
1128 	vdo_assert_on_admin_thread(completion->vdo, __func__);
1129 
1130 	spin_lock(&notifier->lock);
1131 	notifier->state = NOTIFIED;
1132 	spin_unlock(&notifier->lock);
1133 
1134 	if (notifier->waiter != NULL)
1135 		vdo_continue_completion(vdo_forget(notifier->waiter),
1136 					completion->result);
1137 }
1138 
1139 /**
1140  * make_thread_read_only() - Inform each thread that the VDO is in read-only mode.
1141  * @completion: The read-only mode completion.
1142  */
1143 static void make_thread_read_only(struct vdo_completion *completion)
1144 {
1145 	struct vdo *vdo = completion->vdo;
1146 	thread_id_t thread_id = completion->callback_thread_id;
1147 	struct read_only_notifier *notifier = as_notifier(completion);
1148 	struct read_only_listener *listener = completion->parent;
1149 
1150 	if (listener == NULL) {
1151 		/* This is the first call on this thread */
1152 		struct vdo_thread *thread = &vdo->threads[thread_id];
1153 
1154 		thread->is_read_only = true;
1155 		listener = thread->listeners;
1156 		if (thread_id == 0)
1157 			vdo_log_error_strerror(READ_ONCE(notifier->read_only_error),
1158 					       "Unrecoverable error, entering read-only mode");
1159 	} else {
1160 		/* We've just finished notifying a listener */
1161 		listener = listener->next;
1162 	}
1163 
1164 	if (listener != NULL) {
1165 		/* We have a listener to notify */
1166 		vdo_prepare_completion(completion, make_thread_read_only,
1167 				       make_thread_read_only, thread_id,
1168 				       listener);
1169 		listener->notify(listener->listener, completion);
1170 		return;
1171 	}
1172 
1173 	/* We're done with this thread */
1174 	if (++thread_id == vdo->thread_config.dedupe_thread) {
1175 		/*
1176 		 * We don't want to notify the dedupe thread since it may be
1177 		 * blocked rebuilding the index.
1178 		 */
1179 		thread_id++;
1180 	}
1181 
1182 	if (thread_id >= vdo->thread_config.thread_count) {
1183 		/* There are no more threads */
1184 		vdo_prepare_completion(completion, finish_entering_read_only_mode,
1185 				       finish_entering_read_only_mode,
1186 				       vdo->thread_config.admin_thread, NULL);
1187 	} else {
1188 		vdo_prepare_completion(completion, make_thread_read_only,
1189 				       make_thread_read_only, thread_id, NULL);
1190 	}
1191 
1192 	vdo_launch_completion(completion);
1193 }
1194 
1195 /**
1196  * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode,
1197  *                                    reversing the effects of
1198  *                                    vdo_wait_until_not_entering_read_only_mode().
1199  * @parent: The object to notify once the operation is complete.
1200  *
1201  * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it
1202  * will be done when this method is called. If that happens, the parent will not be notified until
1203  * the vdo has actually entered read-only mode and attempted to save the super block.
1204  *
1205  * Context: This method may only be called from the admin thread.
1206  */
1207 void vdo_allow_read_only_mode_entry(struct vdo_completion *parent)
1208 {
1209 	struct vdo *vdo = parent->vdo;
1210 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1211 
1212 	vdo_assert_on_admin_thread(vdo, __func__);
1213 
1214 	if (notifier->waiter != NULL) {
1215 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1216 		return;
1217 	}
1218 
1219 	spin_lock(&notifier->lock);
1220 	if (notifier->state == MAY_NOT_NOTIFY) {
1221 		if (notifier->read_only_error == VDO_SUCCESS) {
1222 			notifier->state = MAY_NOTIFY;
1223 		} else {
1224 			notifier->state = NOTIFYING;
1225 			notifier->waiter = parent;
1226 		}
1227 	}
1228 	spin_unlock(&notifier->lock);
1229 
1230 	if (notifier->waiter == NULL) {
1231 		/* We're done */
1232 		vdo_launch_completion(parent);
1233 		return;
1234 	}
1235 
1236 	/* Do the pending notification. */
1237 	make_thread_read_only(&notifier->completion);
1238 }
1239 
1240 /**
1241  * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the
1242  *                              super block.
1243  * @vdo: The vdo.
1244  * @error_code: The error which caused the VDO to enter read-only mode.
1245  *
1246  * This method is a no-op if the VDO is already read-only.
1247  */
1248 void vdo_enter_read_only_mode(struct vdo *vdo, int error_code)
1249 {
1250 	bool notify = false;
1251 	thread_id_t thread_id = vdo_get_callback_thread_id();
1252 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1253 	struct vdo_thread *thread;
1254 
1255 	if (thread_id != VDO_INVALID_THREAD_ID) {
1256 		thread = &vdo->threads[thread_id];
1257 		if (thread->is_read_only) {
1258 			/* This thread has already gone read-only. */
1259 			return;
1260 		}
1261 
1262 		/* Record for this thread that the VDO is read-only. */
1263 		thread->is_read_only = true;
1264 	}
1265 
1266 	spin_lock(&notifier->lock);
1267 	if (notifier->read_only_error == VDO_SUCCESS) {
1268 		WRITE_ONCE(notifier->read_only_error, error_code);
1269 		if (notifier->state == MAY_NOTIFY) {
1270 			notifier->state = NOTIFYING;
1271 			notify = true;
1272 		}
1273 	}
1274 	spin_unlock(&notifier->lock);
1275 
1276 	if (!notify) {
1277 		/* The notifier is already aware of a read-only error */
1278 		return;
1279 	}
1280 
1281 	/* Initiate a notification starting on the lowest numbered thread. */
1282 	vdo_launch_completion_callback(&notifier->completion, make_thread_read_only, 0);
1283 }
1284 
1285 /**
1286  * vdo_is_read_only() - Check whether the VDO is read-only.
1287  * @vdo: The vdo.
1288  *
1289  * Return: true if the vdo is read-only.
1290  *
1291  * This method may be called from any thread, as opposed to examining the VDO's state field which
1292  * is only safe to check from the admin thread.
1293  */
1294 bool vdo_is_read_only(struct vdo *vdo)
1295 {
1296 	return vdo->threads[vdo_get_callback_thread_id()].is_read_only;
1297 }
1298 
1299 /**
1300  * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode.
1301  * @vdo: The vdo to query.
1302  *
1303  * Return: true if the vdo is in read-only mode.
1304  */
1305 bool vdo_in_read_only_mode(const struct vdo *vdo)
1306 {
1307 	return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE);
1308 }
1309 
1310 /**
1311  * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode.
1312  * @vdo: The vdo to query.
1313  *
1314  * Return: true if the vdo is in recovery mode.
1315  */
1316 bool vdo_in_recovery_mode(const struct vdo *vdo)
1317 {
1318 	return (vdo_get_state(vdo) == VDO_RECOVERING);
1319 }
1320 
1321 /**
1322  * vdo_enter_recovery_mode() - Put the vdo into recovery mode.
1323  * @vdo: The vdo.
1324  */
1325 void vdo_enter_recovery_mode(struct vdo *vdo)
1326 {
1327 	vdo_assert_on_admin_thread(vdo, __func__);
1328 
1329 	if (vdo_in_read_only_mode(vdo))
1330 		return;
1331 
1332 	vdo_log_info("Entering recovery mode");
1333 	vdo_set_state(vdo, VDO_RECOVERING);
1334 }
1335 
1336 /**
1337  * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete.
1338  * @completion: The sync completion.
1339  */
1340 static void complete_synchronous_action(struct vdo_completion *completion)
1341 {
1342 	vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION);
1343 	complete(&(container_of(completion, struct sync_completion,
1344 				vdo_completion)->completion));
1345 }
1346 
1347 /**
1348  * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete.
1349  * @vdo: The vdo.
1350  * @action: The callback to launch.
1351  * @thread_id: The thread on which to run the action.
1352  * @parent: The parent of the sync completion (may be NULL).
1353  */
1354 static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action,
1355 				      thread_id_t thread_id, void *parent)
1356 {
1357 	struct sync_completion sync;
1358 
1359 	vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION);
1360 	init_completion(&sync.completion);
1361 	sync.vdo_completion.parent = parent;
1362 	vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id);
1363 	wait_for_completion(&sync.completion);
1364 	return sync.vdo_completion.result;
1365 }
1366 
1367 /**
1368  * set_compression_callback() - Callback to turn compression on or off.
1369  * @completion: The completion.
1370  */
1371 static void set_compression_callback(struct vdo_completion *completion)
1372 {
1373 	struct vdo *vdo = completion->vdo;
1374 	bool *enable = completion->parent;
1375 	bool was_enabled = vdo_get_compressing(vdo);
1376 
1377 	if (*enable != was_enabled) {
1378 		WRITE_ONCE(vdo->compressing, *enable);
1379 		if (was_enabled) {
1380 			/* Signal the packer to flush since compression has been disabled. */
1381 			vdo_flush_packer(vdo->packer);
1382 		}
1383 	}
1384 
1385 	vdo_log_info("compression is %s", (*enable ? "enabled" : "disabled"));
1386 	*enable = was_enabled;
1387 	complete_synchronous_action(completion);
1388 }
1389 
1390 /**
1391  * vdo_set_compressing() - Turn compression on or off.
1392  * @vdo: The vdo.
1393  * @enable: Whether to enable or disable compression.
1394  *
1395  * Return: Whether compression was previously on or off.
1396  */
1397 bool vdo_set_compressing(struct vdo *vdo, bool enable)
1398 {
1399 	perform_synchronous_action(vdo, set_compression_callback,
1400 				   vdo->thread_config.packer_thread,
1401 				   &enable);
1402 	return enable;
1403 }
1404 
1405 /**
1406  * vdo_get_compressing() - Get whether compression is enabled in a vdo.
1407  * @vdo: The vdo.
1408  *
1409  * Return: State of compression.
1410  */
1411 bool vdo_get_compressing(struct vdo *vdo)
1412 {
1413 	return READ_ONCE(vdo->compressing);
1414 }
1415 
1416 static size_t get_block_map_cache_size(const struct vdo *vdo)
1417 {
1418 	return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE;
1419 }
1420 
1421 static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo)
1422 {
1423 	/*
1424 	 * The error counts can be incremented from arbitrary threads and so must be incremented
1425 	 * atomically, but they are just statistics with no semantics that could rely on memory
1426 	 * order, so unfenced reads are sufficient.
1427 	 */
1428 	const struct atomic_statistics *atoms = &vdo->stats;
1429 
1430 	return (struct error_statistics) {
1431 		.invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count),
1432 		.no_space_error_count = atomic64_read(&atoms->no_space_error_count),
1433 		.read_only_error_count = atomic64_read(&atoms->read_only_error_count),
1434 	};
1435 }
1436 
1437 static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a)
1438 {
1439 	b->read = atomic64_read(&a->read);
1440 	b->write = atomic64_read(&a->write);
1441 	b->discard = atomic64_read(&a->discard);
1442 	b->flush = atomic64_read(&a->flush);
1443 	b->empty_flush = atomic64_read(&a->empty_flush);
1444 	b->fua = atomic64_read(&a->fua);
1445 }
1446 
1447 static struct bio_stats subtract_bio_stats(struct bio_stats minuend,
1448 					   struct bio_stats subtrahend)
1449 {
1450 	return (struct bio_stats) {
1451 		.read = minuend.read - subtrahend.read,
1452 		.write = minuend.write - subtrahend.write,
1453 		.discard = minuend.discard - subtrahend.discard,
1454 		.flush = minuend.flush - subtrahend.flush,
1455 		.empty_flush = minuend.empty_flush - subtrahend.empty_flush,
1456 		.fua = minuend.fua - subtrahend.fua,
1457 	};
1458 }
1459 
1460 /**
1461  * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data.
1462  * @vdo: The vdo.
1463  *
1464  * Return: The number of blocks allocated for user data.
1465  */
1466 static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo)
1467 {
1468 	return (vdo_get_slab_depot_allocated_blocks(vdo->depot) -
1469 		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1470 }
1471 
1472 /**
1473  * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata.
1474  * @vdo: The vdo.
1475  *
1476  * Return: The number of overhead blocks.
1477  */
1478 static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo)
1479 {
1480 	/*
1481 	 * config.physical_blocks is mutated during resize and is in a packed structure,
1482 	 * but resize runs on admin thread.
1483 	 * TODO: Verify that this is always safe.
1484 	 */
1485 	return (vdo->states.vdo.config.physical_blocks -
1486 		vdo_get_slab_depot_data_blocks(vdo->depot) +
1487 		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1488 }
1489 
1490 static const char *vdo_describe_state(enum vdo_state state)
1491 {
1492 	/* These strings should all fit in the 15 chars of VDOStatistics.mode. */
1493 	switch (state) {
1494 	case VDO_RECOVERING:
1495 		return "recovering";
1496 
1497 	case VDO_READ_ONLY_MODE:
1498 		return "read-only";
1499 
1500 	default:
1501 		return "normal";
1502 	}
1503 }
1504 
1505 /**
1506  * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread.
1507  * @vdo: The vdo.
1508  * @stats: The statistics structure to populate.
1509  */
1510 static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats)
1511 {
1512 	struct recovery_journal *journal = vdo->recovery_journal;
1513 	enum vdo_state state = vdo_get_state(vdo);
1514 
1515 	vdo_assert_on_admin_thread(vdo, __func__);
1516 
1517 	/* start with a clean slate */
1518 	memset(stats, 0, sizeof(struct vdo_statistics));
1519 
1520 	/*
1521 	 * These are immutable properties of the vdo object, so it is safe to query them from any
1522 	 * thread.
1523 	 */
1524 	stats->version = STATISTICS_VERSION;
1525 	stats->logical_blocks = vdo->states.vdo.config.logical_blocks;
1526 	/*
1527 	 * config.physical_blocks is mutated during resize and is in a packed structure, but resize
1528 	 * runs on the admin thread.
1529 	 * TODO: verify that this is always safe
1530 	 */
1531 	stats->physical_blocks = vdo->states.vdo.config.physical_blocks;
1532 	stats->block_size = VDO_BLOCK_SIZE;
1533 	stats->complete_recoveries = vdo->states.vdo.complete_recoveries;
1534 	stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries;
1535 	stats->block_map_cache_size = get_block_map_cache_size(vdo);
1536 
1537 	/* The callees are responsible for thread-safety. */
1538 	stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo);
1539 	stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo);
1540 	stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal);
1541 	vdo_get_slab_depot_statistics(vdo->depot, stats);
1542 	stats->journal = vdo_get_recovery_journal_statistics(journal);
1543 	stats->packer = vdo_get_packer_statistics(vdo->packer);
1544 	stats->block_map = vdo_get_block_map_statistics(vdo->block_map);
1545 	vdo_get_dedupe_statistics(vdo->hash_zones, stats);
1546 	stats->errors = get_vdo_error_statistics(vdo);
1547 	stats->in_recovery_mode = (state == VDO_RECOVERING);
1548 	snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state));
1549 
1550 	stats->instance = vdo->instance;
1551 	stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool);
1552 	stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool);
1553 
1554 	stats->flush_out = atomic64_read(&vdo->stats.flush_out);
1555 	stats->logical_block_size = vdo->device_config->logical_block_size;
1556 	copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in);
1557 	copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial);
1558 	copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out);
1559 	copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta);
1560 	copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal);
1561 	copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache);
1562 	copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed);
1563 	copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed);
1564 	copy_bio_stat(&stats->bios_journal_completed,
1565 		      &vdo->stats.bios_journal_completed);
1566 	copy_bio_stat(&stats->bios_page_cache_completed,
1567 		      &vdo->stats.bios_page_cache_completed);
1568 	copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged);
1569 	copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial);
1570 	stats->bios_in_progress =
1571 		subtract_bio_stats(stats->bios_in, stats->bios_acknowledged);
1572 	vdo_get_memory_stats(&stats->memory_usage.bytes_used,
1573 			     &stats->memory_usage.peak_bytes_used);
1574 }
1575 
1576 /**
1577  * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics
1578  *                                   structure on the admin thread.
1579  * @completion: The completion.
1580  *
1581  * This callback is registered in vdo_fetch_statistics().
1582  */
1583 static void vdo_fetch_statistics_callback(struct vdo_completion *completion)
1584 {
1585 	get_vdo_statistics(completion->vdo, completion->parent);
1586 	complete_synchronous_action(completion);
1587 }
1588 
1589 /**
1590  * vdo_fetch_statistics() - Fetch statistics on the correct thread.
1591  * @vdo: The vdo.
1592  * @stats: The vdo statistics are returned here.
1593  */
1594 void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats)
1595 {
1596 	perform_synchronous_action(vdo, vdo_fetch_statistics_callback,
1597 				   vdo->thread_config.admin_thread, stats);
1598 }
1599 
1600 /**
1601  * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is
1602  *                                currently running.
1603  *
1604  * Return: The current thread ID, or -1 if no such thread.
1605  */
1606 thread_id_t vdo_get_callback_thread_id(void)
1607 {
1608 	struct vdo_work_queue *queue = vdo_get_current_work_queue();
1609 	struct vdo_thread *thread;
1610 	thread_id_t thread_id;
1611 
1612 	if (queue == NULL)
1613 		return VDO_INVALID_THREAD_ID;
1614 
1615 	thread = vdo_get_work_queue_owner(queue);
1616 	thread_id = thread->thread_id;
1617 
1618 	if (PARANOID_THREAD_CONSISTENCY_CHECKS) {
1619 		BUG_ON(thread_id >= thread->vdo->thread_config.thread_count);
1620 		BUG_ON(thread != &thread->vdo->threads[thread_id]);
1621 	}
1622 
1623 	return thread_id;
1624 }
1625 
1626 /**
1627  * vdo_dump_status() - Dump status information about a vdo to the log for debugging.
1628  * @vdo: The vdo to dump.
1629  */
1630 void vdo_dump_status(const struct vdo *vdo)
1631 {
1632 	zone_count_t zone;
1633 
1634 	vdo_dump_flusher(vdo->flusher);
1635 	vdo_dump_recovery_journal_statistics(vdo->recovery_journal);
1636 	vdo_dump_packer(vdo->packer);
1637 	vdo_dump_slab_depot(vdo->depot);
1638 
1639 	for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++)
1640 		vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]);
1641 
1642 	for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++)
1643 		vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]);
1644 
1645 	vdo_dump_hash_zones(vdo->hash_zones);
1646 }
1647 
1648 /**
1649  * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread.
1650  * @vdo: The vdo.
1651  * @name: The name of the function which should be running on the admin thread (for logging).
1652  */
1653 void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name)
1654 {
1655 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread),
1656 			    "%s called on admin thread", name);
1657 }
1658 
1659 /**
1660  * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified
1661  *                                       logical zone thread.
1662  * @vdo: The vdo.
1663  * @logical_zone: The number of the logical zone.
1664  * @name: The name of the calling function.
1665  */
1666 void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone,
1667 				       const char *name)
1668 {
1669 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1670 			     vdo->thread_config.logical_threads[logical_zone]),
1671 			    "%s called on logical thread", name);
1672 }
1673 
1674 /**
1675  * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified
1676  *                                        physical zone thread.
1677  * @vdo: The vdo.
1678  * @physical_zone: The number of the physical zone.
1679  * @name: The name of the calling function.
1680  */
1681 void vdo_assert_on_physical_zone_thread(const struct vdo *vdo,
1682 					zone_count_t physical_zone, const char *name)
1683 {
1684 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1685 			     vdo->thread_config.physical_threads[physical_zone]),
1686 			    "%s called on physical thread", name);
1687 }
1688 
1689 /**
1690  * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number.
1691  * @vdo: The vdo containing the physical zones.
1692  * @pbn: The PBN of the data block.
1693  * @zone_ptr: A pointer to return the physical zone.
1694  *
1695  * Gets the physical zone responsible for a given physical block number of a data block in this vdo
1696  * instance, or of the zero block (for which a NULL zone is returned). For any other block number
1697  * that is not in the range of valid data block numbers in any slab, an error will be returned.
1698  * This function is safe to call on invalid block numbers; it will not put the vdo into read-only
1699  * mode.
1700  *
1701  * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any
1702  *         other failure.
1703  */
1704 int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn,
1705 			  struct physical_zone **zone_ptr)
1706 {
1707 	struct vdo_slab *slab;
1708 	int result;
1709 
1710 	if (pbn == VDO_ZERO_BLOCK) {
1711 		*zone_ptr = NULL;
1712 		return VDO_SUCCESS;
1713 	}
1714 
1715 	/*
1716 	 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first
1717 	 * because it won't trigger read-only mode on an invalid PBN.
1718 	 */
1719 	if (!vdo_is_physical_data_block(vdo->depot, pbn))
1720 		return VDO_OUT_OF_RANGE;
1721 
1722 	/* With the PBN already checked, we should always succeed in finding a slab. */
1723 	slab = vdo_get_slab(vdo->depot, pbn);
1724 	result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs");
1725 	if (result != VDO_SUCCESS)
1726 		return result;
1727 
1728 	*zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number];
1729 	return VDO_SUCCESS;
1730 }
1731