xref: /linux/drivers/md/dm-vdo/dm-vdo-target.c (revision a1cf2bd5b6424ead3d75d09c822f665907094a80)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include <linux/atomic.h>
7 #include <linux/bitops.h>
8 #include <linux/completion.h>
9 #include <linux/delay.h>
10 #include <linux/device-mapper.h>
11 #include <linux/err.h>
12 #include <linux/log2.h>
13 #include <linux/module.h>
14 #include <linux/mutex.h>
15 #include <linux/spinlock.h>
16 
17 #include "admin-state.h"
18 #include "block-map.h"
19 #include "completion.h"
20 #include "constants.h"
21 #include "data-vio.h"
22 #include "dedupe.h"
23 #include "dump.h"
24 #include "encodings.h"
25 #include "errors.h"
26 #include "flush.h"
27 #include "io-submitter.h"
28 #include "logger.h"
29 #include "memory-alloc.h"
30 #include "message-stats.h"
31 #include "recovery-journal.h"
32 #include "repair.h"
33 #include "slab-depot.h"
34 #include "status-codes.h"
35 #include "string-utils.h"
36 #include "thread-device.h"
37 #include "thread-registry.h"
38 #include "thread-utils.h"
39 #include "types.h"
40 #include "vdo.h"
41 #include "vio.h"
42 
43 enum admin_phases {
44 	GROW_LOGICAL_PHASE_START,
45 	GROW_LOGICAL_PHASE_GROW_BLOCK_MAP,
46 	GROW_LOGICAL_PHASE_END,
47 	GROW_LOGICAL_PHASE_ERROR,
48 	GROW_PHYSICAL_PHASE_START,
49 	GROW_PHYSICAL_PHASE_COPY_SUMMARY,
50 	GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS,
51 	GROW_PHYSICAL_PHASE_USE_NEW_SLABS,
52 	GROW_PHYSICAL_PHASE_END,
53 	GROW_PHYSICAL_PHASE_ERROR,
54 	LOAD_PHASE_START,
55 	LOAD_PHASE_LOAD_DEPOT,
56 	LOAD_PHASE_MAKE_DIRTY,
57 	LOAD_PHASE_PREPARE_TO_ALLOCATE,
58 	LOAD_PHASE_SCRUB_SLABS,
59 	LOAD_PHASE_DATA_REDUCTION,
60 	LOAD_PHASE_FINISHED,
61 	LOAD_PHASE_DRAIN_JOURNAL,
62 	LOAD_PHASE_WAIT_FOR_READ_ONLY,
63 	PRE_LOAD_PHASE_START,
64 	PRE_LOAD_PHASE_FORMAT_START,
65 	PRE_LOAD_PHASE_FORMAT_SUPER,
66 	PRE_LOAD_PHASE_FORMAT_GEOMETRY,
67 	PRE_LOAD_PHASE_FORMAT_END,
68 	PRE_LOAD_PHASE_LOAD_SUPER,
69 	PRE_LOAD_PHASE_LOAD_COMPONENTS,
70 	PRE_LOAD_PHASE_END,
71 	PREPARE_GROW_PHYSICAL_PHASE_START,
72 	RESUME_PHASE_START,
73 	RESUME_PHASE_ALLOW_READ_ONLY_MODE,
74 	RESUME_PHASE_DEDUPE,
75 	RESUME_PHASE_DEPOT,
76 	RESUME_PHASE_JOURNAL,
77 	RESUME_PHASE_BLOCK_MAP,
78 	RESUME_PHASE_LOGICAL_ZONES,
79 	RESUME_PHASE_PACKER,
80 	RESUME_PHASE_FLUSHER,
81 	RESUME_PHASE_DATA_VIOS,
82 	RESUME_PHASE_END,
83 	SUSPEND_PHASE_START,
84 	SUSPEND_PHASE_PACKER,
85 	SUSPEND_PHASE_DATA_VIOS,
86 	SUSPEND_PHASE_DEDUPE,
87 	SUSPEND_PHASE_FLUSHES,
88 	SUSPEND_PHASE_LOGICAL_ZONES,
89 	SUSPEND_PHASE_BLOCK_MAP,
90 	SUSPEND_PHASE_JOURNAL,
91 	SUSPEND_PHASE_DEPOT,
92 	SUSPEND_PHASE_READ_ONLY_WAIT,
93 	SUSPEND_PHASE_WRITE_SUPER_BLOCK,
94 	SUSPEND_PHASE_END,
95 };
96 
97 static const char * const ADMIN_PHASE_NAMES[] = {
98 	"GROW_LOGICAL_PHASE_START",
99 	"GROW_LOGICAL_PHASE_GROW_BLOCK_MAP",
100 	"GROW_LOGICAL_PHASE_END",
101 	"GROW_LOGICAL_PHASE_ERROR",
102 	"GROW_PHYSICAL_PHASE_START",
103 	"GROW_PHYSICAL_PHASE_COPY_SUMMARY",
104 	"GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS",
105 	"GROW_PHYSICAL_PHASE_USE_NEW_SLABS",
106 	"GROW_PHYSICAL_PHASE_END",
107 	"GROW_PHYSICAL_PHASE_ERROR",
108 	"LOAD_PHASE_START",
109 	"LOAD_PHASE_LOAD_DEPOT",
110 	"LOAD_PHASE_MAKE_DIRTY",
111 	"LOAD_PHASE_PREPARE_TO_ALLOCATE",
112 	"LOAD_PHASE_SCRUB_SLABS",
113 	"LOAD_PHASE_DATA_REDUCTION",
114 	"LOAD_PHASE_FINISHED",
115 	"LOAD_PHASE_DRAIN_JOURNAL",
116 	"LOAD_PHASE_WAIT_FOR_READ_ONLY",
117 	"PRE_LOAD_PHASE_START",
118 	"PRE_LOAD_PHASE_FORMAT_START",
119 	"PRE_LOAD_PHASE_FORMAT_SUPER",
120 	"PRE_LOAD_PHASE_FORMAT_GEOMETRY",
121 	"PRE_LOAD_PHASE_FORMAT_END",
122 	"PRE_LOAD_PHASE_LOAD_SUPER",
123 	"PRE_LOAD_PHASE_LOAD_COMPONENTS",
124 	"PRE_LOAD_PHASE_END",
125 	"PREPARE_GROW_PHYSICAL_PHASE_START",
126 	"RESUME_PHASE_START",
127 	"RESUME_PHASE_ALLOW_READ_ONLY_MODE",
128 	"RESUME_PHASE_DEDUPE",
129 	"RESUME_PHASE_DEPOT",
130 	"RESUME_PHASE_JOURNAL",
131 	"RESUME_PHASE_BLOCK_MAP",
132 	"RESUME_PHASE_LOGICAL_ZONES",
133 	"RESUME_PHASE_PACKER",
134 	"RESUME_PHASE_FLUSHER",
135 	"RESUME_PHASE_DATA_VIOS",
136 	"RESUME_PHASE_END",
137 	"SUSPEND_PHASE_START",
138 	"SUSPEND_PHASE_PACKER",
139 	"SUSPEND_PHASE_DATA_VIOS",
140 	"SUSPEND_PHASE_DEDUPE",
141 	"SUSPEND_PHASE_FLUSHES",
142 	"SUSPEND_PHASE_LOGICAL_ZONES",
143 	"SUSPEND_PHASE_BLOCK_MAP",
144 	"SUSPEND_PHASE_JOURNAL",
145 	"SUSPEND_PHASE_DEPOT",
146 	"SUSPEND_PHASE_READ_ONLY_WAIT",
147 	"SUSPEND_PHASE_WRITE_SUPER_BLOCK",
148 	"SUSPEND_PHASE_END",
149 };
150 
151 /* If we bump this, update the arrays below */
152 #define TABLE_VERSION 4
153 
154 /* arrays for handling different table versions */
155 static const u8 REQUIRED_ARGC[] = { 10, 12, 9, 7, 6 };
156 /* pool name no longer used. only here for verification of older versions */
157 static const u8 POOL_NAME_ARG_INDEX[] = { 8, 10, 8 };
158 
159 /*
160  * Track in-use instance numbers using a flat bit array.
161  *
162  * O(n) run time isn't ideal, but if we have 1000 VDO devices in use simultaneously we still only
163  * need to scan 16 words, so it's not likely to be a big deal compared to other resource usage.
164  */
165 
166 /*
167  * This minimum size for the bit array creates a numbering space of 0-999, which allows
168  * successive starts of the same volume to have different instance numbers in any
169  * reasonably-sized test. Changing instances on restart allows vdoMonReport to detect that
170  * the ephemeral stats have reset to zero.
171  */
172 #define BIT_COUNT_MINIMUM 1000
173 /* Grow the bit array by this many bits when needed */
174 #define BIT_COUNT_INCREMENT 100
175 
176 struct instance_tracker {
177 	unsigned int bit_count;
178 	unsigned long *words;
179 	unsigned int count;
180 	unsigned int next;
181 };
182 
183 static DEFINE_MUTEX(instances_lock);
184 static struct instance_tracker instances;
185 
186 /**
187  * free_device_config() - Free a device config created by parse_device_config().
188  * @config: The config to free.
189  */
190 static void free_device_config(struct device_config *config)
191 {
192 	if (config == NULL)
193 		return;
194 
195 	if (config->owned_device != NULL)
196 		dm_put_device(config->owning_target, config->owned_device);
197 
198 	vdo_free(config->parent_device_name);
199 	vdo_free(config->original_string);
200 
201 	/* Reduce the chance a use-after-free (as in BZ 1669960) happens to work. */
202 	memset(config, 0, sizeof(*config));
203 	vdo_free(config);
204 }
205 
206 /**
207  * get_version_number() - Decide the version number from argv.
208  *
209  * @argc: The number of table values.
210  * @argv: The array of table values.
211  * @error_ptr: A pointer to return a error string in.
212  * @version_ptr: A pointer to return the version.
213  *
214  * Return: VDO_SUCCESS or an error code.
215  */
216 static int get_version_number(int argc, char **argv, char **error_ptr,
217 			      unsigned int *version_ptr)
218 {
219 	/* version, if it exists, is in a form of V<n> */
220 	if (sscanf(argv[0], "V%u", version_ptr) == 1) {
221 		if (*version_ptr < 1 || *version_ptr > TABLE_VERSION) {
222 			*error_ptr = "Unknown version number detected";
223 			return VDO_BAD_CONFIGURATION;
224 		}
225 	} else {
226 		/* V0 actually has no version number in the table string */
227 		*version_ptr = 0;
228 	}
229 
230 	/*
231 	 * V0 and V1 have no optional parameters. There will always be a parameter for thread
232 	 * config, even if it's a "." to show it's an empty list.
233 	 */
234 	if (*version_ptr <= 1) {
235 		if (argc != REQUIRED_ARGC[*version_ptr]) {
236 			*error_ptr = "Incorrect number of arguments for version";
237 			return VDO_BAD_CONFIGURATION;
238 		}
239 	} else if (argc < REQUIRED_ARGC[*version_ptr]) {
240 		*error_ptr = "Incorrect number of arguments for version";
241 		return VDO_BAD_CONFIGURATION;
242 	}
243 
244 	if (*version_ptr != TABLE_VERSION) {
245 		vdo_log_warning("Detected version mismatch between kernel module and tools kernel: %d, tool: %d",
246 				TABLE_VERSION, *version_ptr);
247 		vdo_log_warning("Please consider upgrading management tools to match kernel.");
248 	}
249 	return VDO_SUCCESS;
250 }
251 
252 /* Free a list of non-NULL string pointers, and then the list itself. */
253 static void free_string_array(char **string_array)
254 {
255 	unsigned int offset;
256 
257 	for (offset = 0; string_array[offset] != NULL; offset++)
258 		vdo_free(string_array[offset]);
259 	vdo_free(string_array);
260 }
261 
262 /*
263  * Split the input string into substrings, separated at occurrences of the indicated character,
264  * returning a null-terminated list of string pointers.
265  *
266  * The string pointers and the pointer array itself should both be freed with vdo_free() when no
267  * longer needed. This can be done with vdo_free_string_array (below) if the pointers in the array
268  * are not changed. Since the array and copied strings are allocated by this function, it may only
269  * be used in contexts where allocation is permitted.
270  *
271  * Empty substrings are not ignored; that is, returned substrings may be empty strings if the
272  * separator occurs twice in a row.
273  */
274 static int split_string(const char *string, char separator, char ***substring_array_ptr)
275 {
276 	unsigned int current_substring = 0, substring_count = 1;
277 	const char *s;
278 	char **substrings;
279 	int result;
280 	ptrdiff_t length;
281 
282 	for (s = string; *s != 0; s++) {
283 		if (*s == separator)
284 			substring_count++;
285 	}
286 
287 	result = vdo_allocate(substring_count + 1, "string-splitting array", &substrings);
288 	if (result != VDO_SUCCESS)
289 		return result;
290 
291 	for (s = string; *s != 0; s++) {
292 		if (*s == separator) {
293 			ptrdiff_t length = s - string;
294 
295 			result = vdo_allocate(length + 1, "split string",
296 					      &substrings[current_substring]);
297 			if (result != VDO_SUCCESS) {
298 				free_string_array(substrings);
299 				return result;
300 			}
301 			/*
302 			 * Trailing NUL is already in place after allocation; deal with the zero or
303 			 * more non-NUL bytes in the string.
304 			 */
305 			if (length > 0)
306 				memcpy(substrings[current_substring], string, length);
307 			string = s + 1;
308 			current_substring++;
309 			BUG_ON(current_substring >= substring_count);
310 		}
311 	}
312 	/* Process final string, with no trailing separator. */
313 	BUG_ON(current_substring != (substring_count - 1));
314 	length = strlen(string);
315 
316 	result = vdo_allocate(length + 1, "split string", &substrings[current_substring]);
317 	if (result != VDO_SUCCESS) {
318 		free_string_array(substrings);
319 		return result;
320 	}
321 	memcpy(substrings[current_substring], string, length);
322 	current_substring++;
323 	/* substrings[current_substring] is NULL already */
324 	*substring_array_ptr = substrings;
325 	return VDO_SUCCESS;
326 }
327 
328 /*
329  * Join the input substrings into one string, joined with the indicated character, returning a
330  * string. array_length is a bound on the number of valid elements in substring_array, in case it
331  * is not NULL-terminated.
332  */
333 static int join_strings(char **substring_array, size_t array_length, char separator,
334 			char **string_ptr)
335 {
336 	size_t string_length = 0;
337 	size_t i;
338 	int result;
339 	char *output, *current_position;
340 
341 	for (i = 0; (i < array_length) && (substring_array[i] != NULL); i++)
342 		string_length += strlen(substring_array[i]) + 1;
343 
344 	result = vdo_allocate(string_length, __func__, &output);
345 	if (result != VDO_SUCCESS)
346 		return result;
347 
348 	current_position = &output[0];
349 
350 	for (i = 0; (i < array_length) && (substring_array[i] != NULL); i++) {
351 		current_position = vdo_append_to_buffer(current_position,
352 							output + string_length, "%s",
353 							substring_array[i]);
354 		*current_position = separator;
355 		current_position++;
356 	}
357 
358 	/* We output one too many separators; replace the last with a zero byte. */
359 	if (current_position != output)
360 		*(current_position - 1) = '\0';
361 
362 	*string_ptr = output;
363 	return VDO_SUCCESS;
364 }
365 
366 /**
367  * parse_bool() - Parse a two-valued option into a bool.
368  * @bool_str: The string value to convert to a bool.
369  * @true_str: The string value which should be converted to true.
370  * @false_str: The string value which should be converted to false.
371  * @bool_ptr: A pointer to return the bool value in.
372  *
373  * Return: VDO_SUCCESS or an error if bool_str is neither true_str nor false_str.
374  */
375 static inline int __must_check parse_bool(const char *bool_str, const char *true_str,
376 					  const char *false_str, bool *bool_ptr)
377 {
378 	bool value = false;
379 
380 	if (strcmp(bool_str, true_str) == 0)
381 		value = true;
382 	else if (strcmp(bool_str, false_str) == 0)
383 		value = false;
384 	else
385 		return VDO_BAD_CONFIGURATION;
386 
387 	*bool_ptr = value;
388 	return VDO_SUCCESS;
389 }
390 
391 /**
392  * parse_memory() - Parse a string into an index memory value.
393  * @memory_str: The string value to convert to a memory value.
394  * @memory_ptr: A pointer to return the memory value in.
395  *
396  * Return: VDO_SUCCESS or an error
397  */
398 static int __must_check parse_memory(const char *memory_str,
399 				     uds_memory_config_size_t *memory_ptr)
400 {
401 	uds_memory_config_size_t memory;
402 
403 	if (strcmp(memory_str, "0.25") == 0) {
404 		memory = UDS_MEMORY_CONFIG_256MB;
405 	} else if ((strcmp(memory_str, "0.5") == 0) || (strcmp(memory_str, "0.50") == 0)) {
406 		memory = UDS_MEMORY_CONFIG_512MB;
407 	} else if (strcmp(memory_str, "0.75") == 0) {
408 		memory = UDS_MEMORY_CONFIG_768MB;
409 	} else {
410 		unsigned int value;
411 		int result;
412 
413 		result = kstrtouint(memory_str, 10, &value);
414 		if (result) {
415 			vdo_log_error("optional parameter error: invalid memory size, must be a positive integer");
416 			return -EINVAL;
417 		}
418 
419 		if (value > UDS_MEMORY_CONFIG_MAX) {
420 			vdo_log_error("optional parameter error: invalid memory size, must not be greater than %d",
421 				      UDS_MEMORY_CONFIG_MAX);
422 			return -EINVAL;
423 		}
424 
425 		memory = value;
426 	}
427 
428 	*memory_ptr = memory;
429 	return VDO_SUCCESS;
430 }
431 
432 /**
433  * parse_slab_size() - Parse a string option into a slab size value.
434  * @slab_str: The string value representing slab size.
435  * @slab_size_ptr: A pointer to return the slab size in.
436  *
437  * Return: VDO_SUCCESS or an error
438  */
439 static int __must_check parse_slab_size(const char *slab_str, block_count_t *slab_size_ptr)
440 {
441 	block_count_t value;
442 	int result;
443 
444 	result = kstrtoull(slab_str, 10, &value);
445 	if (result) {
446 		vdo_log_error("optional parameter error: invalid slab size, must be a postive integer");
447 		return -EINVAL;
448 	}
449 
450 	if (value < MIN_VDO_SLAB_BLOCKS || value > MAX_VDO_SLAB_BLOCKS || (!is_power_of_2(value))) {
451 		vdo_log_error("optional parameter error: invalid slab size, must be a power of two between %u and %u",
452 			      MIN_VDO_SLAB_BLOCKS, MAX_VDO_SLAB_BLOCKS);
453 		return -EINVAL;
454 	}
455 
456 	*slab_size_ptr = value;
457 	return VDO_SUCCESS;
458 }
459 
460 /**
461  * process_one_thread_config_spec() - Process one component of a thread parameter configuration
462  *				      string and update the configuration data structure.
463  * @thread_param_type: The type of thread specified.
464  * @count: The thread count requested.
465  * @config: The configuration data structure to update.
466  *
467  * If the thread count requested is invalid, a message is logged and -EINVAL returned. If the
468  * thread name is unknown, a message is logged but no error is returned.
469  *
470  * Return: VDO_SUCCESS or -EINVAL
471  */
472 static int process_one_thread_config_spec(const char *thread_param_type,
473 					  unsigned int count,
474 					  struct thread_count_config *config)
475 {
476 	/* Handle limited thread parameters */
477 	if (strcmp(thread_param_type, "bioRotationInterval") == 0) {
478 		if (count == 0) {
479 			vdo_log_error("thread config string error:  'bioRotationInterval' of at least 1 is required");
480 			return -EINVAL;
481 		} else if (count > VDO_BIO_ROTATION_INTERVAL_LIMIT) {
482 			vdo_log_error("thread config string error: 'bioRotationInterval' cannot be higher than %d",
483 				      VDO_BIO_ROTATION_INTERVAL_LIMIT);
484 			return -EINVAL;
485 		}
486 		config->bio_rotation_interval = count;
487 		return VDO_SUCCESS;
488 	}
489 	if (strcmp(thread_param_type, "logical") == 0) {
490 		if (count > MAX_VDO_LOGICAL_ZONES) {
491 			vdo_log_error("thread config string error: at most %d 'logical' threads are allowed",
492 				      MAX_VDO_LOGICAL_ZONES);
493 			return -EINVAL;
494 		}
495 		config->logical_zones = count;
496 		return VDO_SUCCESS;
497 	}
498 	if (strcmp(thread_param_type, "physical") == 0) {
499 		if (count > MAX_VDO_PHYSICAL_ZONES) {
500 			vdo_log_error("thread config string error: at most %d 'physical' threads are allowed",
501 				      MAX_VDO_PHYSICAL_ZONES);
502 			return -EINVAL;
503 		}
504 		config->physical_zones = count;
505 		return VDO_SUCCESS;
506 	}
507 	/* Handle other thread count parameters */
508 	if (count > MAXIMUM_VDO_THREADS) {
509 		vdo_log_error("thread config string error: at most %d '%s' threads are allowed",
510 			      MAXIMUM_VDO_THREADS, thread_param_type);
511 		return -EINVAL;
512 	}
513 	if (strcmp(thread_param_type, "hash") == 0) {
514 		config->hash_zones = count;
515 		return VDO_SUCCESS;
516 	}
517 	if (strcmp(thread_param_type, "cpu") == 0) {
518 		if (count == 0) {
519 			vdo_log_error("thread config string error: at least one 'cpu' thread required");
520 			return -EINVAL;
521 		}
522 		config->cpu_threads = count;
523 		return VDO_SUCCESS;
524 	}
525 	if (strcmp(thread_param_type, "ack") == 0) {
526 		config->bio_ack_threads = count;
527 		return VDO_SUCCESS;
528 	}
529 	if (strcmp(thread_param_type, "bio") == 0) {
530 		if (count == 0) {
531 			vdo_log_error("thread config string error: at least one 'bio' thread required");
532 			return -EINVAL;
533 		}
534 		config->bio_threads = count;
535 		return VDO_SUCCESS;
536 	}
537 
538 	/*
539 	 * Don't fail, just log. This will handle version mismatches between user mode tools and
540 	 * kernel.
541 	 */
542 	vdo_log_info("unknown thread parameter type \"%s\"", thread_param_type);
543 	return VDO_SUCCESS;
544 }
545 
546 /**
547  * parse_one_thread_config_spec() - Parse one component of a thread parameter configuration string
548  *				    and update the configuration data structure.
549  * @spec: The thread parameter specification string.
550  * @config: The configuration data to be updated.
551  */
552 static int parse_one_thread_config_spec(const char *spec,
553 					struct thread_count_config *config)
554 {
555 	unsigned int count;
556 	char **fields;
557 	int result;
558 
559 	result = split_string(spec, '=', &fields);
560 	if (result != VDO_SUCCESS)
561 		return result;
562 
563 	if ((fields[0] == NULL) || (fields[1] == NULL) || (fields[2] != NULL)) {
564 		vdo_log_error("thread config string error: expected thread parameter assignment, saw \"%s\"",
565 			      spec);
566 		free_string_array(fields);
567 		return -EINVAL;
568 	}
569 
570 	result = kstrtouint(fields[1], 10, &count);
571 	if (result) {
572 		vdo_log_error("thread config string error: integer value needed, found \"%s\"",
573 			      fields[1]);
574 		free_string_array(fields);
575 		return result;
576 	}
577 
578 	result = process_one_thread_config_spec(fields[0], count, config);
579 	free_string_array(fields);
580 	return result;
581 }
582 
583 /**
584  * parse_thread_config_string() - Parse the configuration string passed and update the specified
585  *				  counts and other parameters of various types of threads to be
586  *				  created.
587  * @string: Thread parameter configuration string.
588  * @config: The thread configuration data to update.
589  *
590  * The configuration string should contain one or more comma-separated specs of the form
591  * "typename=number"; the supported type names are "cpu", "ack", "bio", "bioRotationInterval",
592  * "logical", "physical", and "hash".
593  *
594  * If an error occurs during parsing of a single key/value pair, we deem it serious enough to stop
595  * further parsing.
596  *
597  * This function can't set the "reason" value the caller wants to pass back, because we'd want to
598  * format it to say which field was invalid, and we can't allocate the "reason" strings
599  * dynamically. So if an error occurs, we'll log the details and pass back an error.
600  *
601  * Return: VDO_SUCCESS or -EINVAL or -ENOMEM
602  */
603 static int parse_thread_config_string(const char *string,
604 				      struct thread_count_config *config)
605 {
606 	int result = VDO_SUCCESS;
607 	char **specs;
608 
609 	if (strcmp(".", string) != 0) {
610 		unsigned int i;
611 
612 		result = split_string(string, ',', &specs);
613 		if (result != VDO_SUCCESS)
614 			return result;
615 
616 		for (i = 0; specs[i] != NULL; i++) {
617 			result = parse_one_thread_config_spec(specs[i], config);
618 			if (result != VDO_SUCCESS)
619 				break;
620 		}
621 		free_string_array(specs);
622 	}
623 	return result;
624 }
625 
626 /**
627  * process_one_key_value_pair() - Process one component of an optional parameter string and update
628  *				  the configuration data structure.
629  * @key: The optional parameter key name.
630  * @value: The optional parameter value.
631  * @config: The configuration data structure to update.
632  *
633  * If the value requested is invalid, a message is logged and -EINVAL returned. If the key is
634  * unknown, a message is logged but no error is returned.
635  *
636  * Return: VDO_SUCCESS or -EINVAL
637  */
638 static int process_one_key_value_pair(const char *key, unsigned int value,
639 				      struct device_config *config)
640 {
641 	/* Non thread optional parameters */
642 	if (strcmp(key, "maxDiscard") == 0) {
643 		if (value == 0) {
644 			vdo_log_error("optional parameter error: at least one max discard block required");
645 			return -EINVAL;
646 		}
647 		/* Max discard sectors in blkdev_issue_discard is UINT_MAX >> 9 */
648 		if (value > (UINT_MAX / VDO_BLOCK_SIZE)) {
649 			vdo_log_error("optional parameter error: at most %d max discard blocks are allowed",
650 				      UINT_MAX / VDO_BLOCK_SIZE);
651 			return -EINVAL;
652 		}
653 		config->max_discard_blocks = value;
654 		return VDO_SUCCESS;
655 	}
656 	/* Handles unknown key names */
657 	return process_one_thread_config_spec(key, value, &config->thread_counts);
658 }
659 
660 /**
661  * parse_one_key_value_pair() - Parse one key/value pair and update the configuration data
662  *				structure.
663  * @key: The optional key name.
664  * @value: The optional value.
665  * @config: The configuration data to be updated.
666  *
667  * Return: VDO_SUCCESS or error.
668  */
669 static int parse_one_key_value_pair(const char *key, const char *value,
670 				    struct device_config *config)
671 {
672 	unsigned int count;
673 	int result;
674 
675 	if (strcmp(key, "deduplication") == 0)
676 		return parse_bool(value, "on", "off", &config->deduplication);
677 
678 	if (strcmp(key, "compression") == 0)
679 		return parse_bool(value, "on", "off", &config->compression);
680 
681 	if (strcmp(key, "indexSparse") == 0)
682 		return parse_bool(value, "on", "off", &config->index_sparse);
683 
684 	if (strcmp(key, "indexMemory") == 0)
685 		return parse_memory(value, &config->index_memory);
686 
687 	if (strcmp(key, "slabSize") == 0)
688 		return parse_slab_size(value, &config->slab_blocks);
689 
690 	/* The remaining arguments must have non-negative integral values. */
691 	result = kstrtouint(value, 10, &count);
692 	if (result) {
693 		vdo_log_error("optional config string error: integer value needed, found \"%s\"",
694 			      value);
695 		return result;
696 	}
697 	return process_one_key_value_pair(key, count, config);
698 }
699 
700 /**
701  * parse_key_value_pairs() - Parse all key/value pairs from a list of arguments.
702  * @argc: The total number of arguments in list.
703  * @argv: The list of key/value pairs.
704  * @config: The device configuration data to update.
705  *
706  * If an error occurs during parsing of a single key/value pair, we deem it serious enough to stop
707  * further parsing.
708  *
709  * This function can't set the "reason" value the caller wants to pass back, because we'd want to
710  * format it to say which field was invalid, and we can't allocate the "reason" strings
711  * dynamically. So if an error occurs, we'll log the details and return the error.
712  *
713  * Return: VDO_SUCCESS or error
714  */
715 static int parse_key_value_pairs(int argc, char **argv, struct device_config *config)
716 {
717 	int result = VDO_SUCCESS;
718 
719 	while (argc) {
720 		result = parse_one_key_value_pair(argv[0], argv[1], config);
721 		if (result != VDO_SUCCESS)
722 			break;
723 
724 		argc -= 2;
725 		argv += 2;
726 	}
727 
728 	return result;
729 }
730 
731 /**
732  * parse_optional_arguments() - Parse the configuration string passed in for optional arguments.
733  * @arg_set: The structure holding the arguments to parse.
734  * @error_ptr: Pointer to a buffer to hold the error string.
735  * @config: Pointer to device configuration data to update.
736  *
737  * For V0/V1 configurations, there will only be one optional parameter; the thread configuration.
738  * The configuration string should contain one or more comma-separated specs of the form
739  * "typename=number"; the supported type names are "cpu", "ack", "bio", "bioRotationInterval",
740  * "logical", "physical", and "hash".
741  *
742  * For V2 configurations and beyond, there could be any number of arguments. They should contain
743  * one or more key/value pairs separated by a space.
744  *
745  * Return: VDO_SUCCESS or error
746  */
747 static int parse_optional_arguments(struct dm_arg_set *arg_set, char **error_ptr,
748 				    struct device_config *config)
749 {
750 	int result = VDO_SUCCESS;
751 
752 	if (config->version == 0 || config->version == 1) {
753 		result = parse_thread_config_string(arg_set->argv[0],
754 						    &config->thread_counts);
755 		if (result != VDO_SUCCESS) {
756 			*error_ptr = "Invalid thread-count configuration";
757 			return VDO_BAD_CONFIGURATION;
758 		}
759 	} else {
760 		if ((arg_set->argc % 2) != 0) {
761 			*error_ptr = "Odd number of optional arguments given but they should be <key> <value> pairs";
762 			return VDO_BAD_CONFIGURATION;
763 		}
764 		result = parse_key_value_pairs(arg_set->argc, arg_set->argv, config);
765 		if (result != VDO_SUCCESS) {
766 			*error_ptr = "Invalid optional argument configuration";
767 			return VDO_BAD_CONFIGURATION;
768 		}
769 	}
770 	return result;
771 }
772 
773 /**
774  * handle_parse_error() - Handle a parsing error.
775  * @config: The config to free.
776  * @error_ptr: A place to store a constant string about the error.
777  * @error_str: A constant string to store in error_ptr.
778  */
779 static void handle_parse_error(struct device_config *config, char **error_ptr,
780 			       char *error_str)
781 {
782 	free_device_config(config);
783 	*error_ptr = error_str;
784 }
785 
786 /**
787  * parse_device_config() - Convert the dmsetup table into a struct device_config.
788  * @argc: The number of table values.
789  * @argv: The array of table values.
790  * @ti: The target structure for this table.
791  * @config_ptr: A pointer to return the allocated config.
792  *
793  * Return: VDO_SUCCESS or an error code.
794  */
795 static int parse_device_config(int argc, char **argv, struct dm_target *ti,
796 			       struct device_config **config_ptr)
797 {
798 	bool enable_512e;
799 	size_t logical_bytes = to_bytes(ti->len);
800 	struct dm_arg_set arg_set;
801 	char **error_ptr = &ti->error;
802 	struct device_config *config = NULL;
803 	int result;
804 
805 	if (logical_bytes > (MAXIMUM_VDO_LOGICAL_BLOCKS * VDO_BLOCK_SIZE)) {
806 		handle_parse_error(config, error_ptr,
807 				   "Logical size exceeds the maximum");
808 		return VDO_BAD_CONFIGURATION;
809 	}
810 
811 	if ((logical_bytes % VDO_BLOCK_SIZE) != 0) {
812 		handle_parse_error(config, error_ptr,
813 				   "Logical size must be a multiple of 4096");
814 		return VDO_BAD_CONFIGURATION;
815 	}
816 
817 	if (argc == 0) {
818 		handle_parse_error(config, error_ptr, "Incorrect number of arguments");
819 		return VDO_BAD_CONFIGURATION;
820 	}
821 
822 	result = vdo_allocate(1, "device_config", &config);
823 	if (result != VDO_SUCCESS) {
824 		handle_parse_error(config, error_ptr,
825 				   "Could not allocate config structure");
826 		return VDO_BAD_CONFIGURATION;
827 	}
828 
829 	config->owning_target = ti;
830 	config->logical_blocks = logical_bytes / VDO_BLOCK_SIZE;
831 	INIT_LIST_HEAD(&config->config_list);
832 
833 	/* Save the original string. */
834 	result = join_strings(argv, argc, ' ', &config->original_string);
835 	if (result != VDO_SUCCESS) {
836 		handle_parse_error(config, error_ptr, "Could not populate string");
837 		return VDO_BAD_CONFIGURATION;
838 	}
839 
840 	vdo_log_info("table line: %s", config->original_string);
841 
842 	config->thread_counts = (struct thread_count_config) {
843 		.bio_ack_threads = 1,
844 		.bio_threads = DEFAULT_VDO_BIO_SUBMIT_QUEUE_COUNT,
845 		.bio_rotation_interval = DEFAULT_VDO_BIO_SUBMIT_QUEUE_ROTATE_INTERVAL,
846 		.cpu_threads = 1,
847 		.logical_zones = 0,
848 		.physical_zones = 0,
849 		.hash_zones = 0,
850 	};
851 	config->max_discard_blocks = 1;
852 	config->deduplication = true;
853 	config->compression = false;
854 	config->index_memory = UDS_MEMORY_CONFIG_256MB;
855 	config->index_sparse = false;
856 	config->slab_blocks = DEFAULT_VDO_SLAB_BLOCKS;
857 
858 	arg_set.argc = argc;
859 	arg_set.argv = argv;
860 
861 	result = get_version_number(argc, argv, error_ptr, &config->version);
862 	if (result != VDO_SUCCESS) {
863 		/* get_version_number sets error_ptr itself. */
864 		handle_parse_error(config, error_ptr, *error_ptr);
865 		return result;
866 	}
867 	/* Move the arg pointer forward only if the argument was there. */
868 	if (config->version >= 1)
869 		dm_shift_arg(&arg_set);
870 
871 	result = vdo_duplicate_string(dm_shift_arg(&arg_set), "parent device name",
872 				      &config->parent_device_name);
873 	if (result != VDO_SUCCESS) {
874 		handle_parse_error(config, error_ptr,
875 				   "Could not copy parent device name");
876 		return VDO_BAD_CONFIGURATION;
877 	}
878 
879 	/* Get the physical blocks, if known. */
880 	if (config->version >= 1) {
881 		result = kstrtoull(dm_shift_arg(&arg_set), 10, &config->physical_blocks);
882 		if (result) {
883 			handle_parse_error(config, error_ptr,
884 					   "Invalid physical block count");
885 			return VDO_BAD_CONFIGURATION;
886 		}
887 	}
888 
889 	/* Get the logical block size and validate */
890 	result = parse_bool(dm_shift_arg(&arg_set), "512", "4096", &enable_512e);
891 	if (result != VDO_SUCCESS) {
892 		handle_parse_error(config, error_ptr, "Invalid logical block size");
893 		return VDO_BAD_CONFIGURATION;
894 	}
895 	config->logical_block_size = (enable_512e ? 512 : 4096);
896 
897 	/* Skip past the two no longer used read cache options. */
898 	if (config->version <= 1)
899 		dm_consume_args(&arg_set, 2);
900 
901 	/* Get the page cache size. */
902 	result = kstrtouint(dm_shift_arg(&arg_set), 10, &config->cache_size);
903 	if (result) {
904 		handle_parse_error(config, error_ptr,
905 				   "Invalid block map page cache size");
906 		return VDO_BAD_CONFIGURATION;
907 	}
908 
909 	/* Get the block map era length. */
910 	result = kstrtouint(dm_shift_arg(&arg_set), 10, &config->block_map_maximum_age);
911 	if (result) {
912 		handle_parse_error(config, error_ptr, "Invalid block map maximum age");
913 		return VDO_BAD_CONFIGURATION;
914 	}
915 
916 	/* Skip past the no longer used MD RAID5 optimization mode */
917 	if (config->version <= 2)
918 		dm_consume_args(&arg_set, 1);
919 
920 	/* Skip past the no longer used write policy setting */
921 	if (config->version <= 3)
922 		dm_consume_args(&arg_set, 1);
923 
924 	/* Skip past the no longer used pool name for older table lines */
925 	if (config->version <= 2) {
926 		/*
927 		 * Make sure the enum to get the pool name from argv directly is still in sync with
928 		 * the parsing of the table line.
929 		 */
930 		if (&arg_set.argv[0] != &argv[POOL_NAME_ARG_INDEX[config->version]]) {
931 			handle_parse_error(config, error_ptr,
932 					   "Pool name not in expected location");
933 			return VDO_BAD_CONFIGURATION;
934 		}
935 		dm_shift_arg(&arg_set);
936 	}
937 
938 	/* Get the optional arguments and validate. */
939 	result = parse_optional_arguments(&arg_set, error_ptr, config);
940 	if (result != VDO_SUCCESS) {
941 		/* parse_optional_arguments sets error_ptr itself. */
942 		handle_parse_error(config, error_ptr, *error_ptr);
943 		return result;
944 	}
945 
946 	/*
947 	 * Logical, physical, and hash zone counts can all be zero; then we get one thread doing
948 	 * everything, our older configuration. If any zone count is non-zero, the others must be
949 	 * as well.
950 	 */
951 	if (((config->thread_counts.logical_zones == 0) !=
952 	     (config->thread_counts.physical_zones == 0)) ||
953 	    ((config->thread_counts.physical_zones == 0) !=
954 	     (config->thread_counts.hash_zones == 0))) {
955 		handle_parse_error(config, error_ptr,
956 				   "Logical, physical, and hash zones counts must all be zero or all non-zero");
957 		return VDO_BAD_CONFIGURATION;
958 	}
959 
960 	if (config->cache_size <
961 	    (2 * MAXIMUM_VDO_USER_VIOS * config->thread_counts.logical_zones)) {
962 		handle_parse_error(config, error_ptr,
963 				   "Insufficient block map cache for logical zones");
964 		return VDO_BAD_CONFIGURATION;
965 	}
966 
967 	result = dm_get_device(ti, config->parent_device_name,
968 			       dm_table_get_mode(ti->table), &config->owned_device);
969 	if (result != 0) {
970 		vdo_log_error("couldn't open device \"%s\": error %d",
971 			      config->parent_device_name, result);
972 		handle_parse_error(config, error_ptr, "Unable to open storage device");
973 		return VDO_BAD_CONFIGURATION;
974 	}
975 
976 	if (config->version == 0) {
977 		u64 device_size = bdev_nr_bytes(config->owned_device->bdev);
978 
979 		config->physical_blocks = device_size / VDO_BLOCK_SIZE;
980 	}
981 
982 	*config_ptr = config;
983 	return result;
984 }
985 
986 static struct vdo *get_vdo_for_target(struct dm_target *ti)
987 {
988 	return ((struct device_config *) ti->private)->vdo;
989 }
990 
991 
992 static int vdo_map_bio(struct dm_target *ti, struct bio *bio)
993 {
994 	struct vdo *vdo = get_vdo_for_target(ti);
995 	struct vdo_work_queue *current_work_queue;
996 	const struct admin_state_code *code = vdo_get_admin_state_code(&vdo->admin.state);
997 
998 	VDO_ASSERT_LOG_ONLY(code->normal, "vdo should not receive bios while in state %s",
999 			    code->name);
1000 
1001 	/* Count all incoming bios. */
1002 	vdo_count_bios(&vdo->stats.bios_in, bio);
1003 
1004 
1005 	/* Handle empty bios.  Empty flush bios are not associated with a vio. */
1006 	if ((bio_op(bio) == REQ_OP_FLUSH) || ((bio->bi_opf & REQ_PREFLUSH) != 0)) {
1007 		vdo_launch_flush(vdo, bio);
1008 		return DM_MAPIO_SUBMITTED;
1009 	}
1010 
1011 	/* This could deadlock, */
1012 	current_work_queue = vdo_get_current_work_queue();
1013 	BUG_ON((current_work_queue != NULL) &&
1014 	       (vdo == vdo_get_work_queue_owner(current_work_queue)->vdo));
1015 	vdo_launch_bio(vdo->data_vio_pool, bio);
1016 	return DM_MAPIO_SUBMITTED;
1017 }
1018 
1019 static void vdo_io_hints(struct dm_target *ti, struct queue_limits *limits)
1020 {
1021 	struct vdo *vdo = get_vdo_for_target(ti);
1022 
1023 	limits->logical_block_size = vdo->device_config->logical_block_size;
1024 	limits->physical_block_size = VDO_BLOCK_SIZE;
1025 
1026 	/* The minimum io size for random io */
1027 	limits->io_min = VDO_BLOCK_SIZE;
1028 	/* The optimal io size for streamed/sequential io */
1029 	limits->io_opt = VDO_BLOCK_SIZE;
1030 
1031 	/*
1032 	 * Sets the maximum discard size that will be passed into VDO. This value comes from a
1033 	 * table line value passed in during dmsetup create.
1034 	 *
1035 	 * The value 1024 is the largest usable value on HD systems. A 2048 sector discard on a
1036 	 * busy HD system takes 31 seconds. We should use a value no higher than 1024, which takes
1037 	 * 15 to 16 seconds on a busy HD system. However, using large values results in 120 second
1038 	 * blocked task warnings in kernel logs. In order to avoid these warnings, we choose to
1039 	 * use the smallest reasonable value.
1040 	 *
1041 	 * The value is used by dm-thin to determine whether to pass down discards. The block layer
1042 	 * splits large discards on this boundary when this is set.
1043 	 */
1044 	limits->max_hw_discard_sectors =
1045 		(vdo->device_config->max_discard_blocks * VDO_SECTORS_PER_BLOCK);
1046 
1047 	/*
1048 	 * Force discards to not begin or end with a partial block by stating the granularity is
1049 	 * 4k.
1050 	 */
1051 	limits->discard_granularity = VDO_BLOCK_SIZE;
1052 }
1053 
1054 static int vdo_iterate_devices(struct dm_target *ti, iterate_devices_callout_fn fn,
1055 			       void *data)
1056 {
1057 	struct device_config *config = get_vdo_for_target(ti)->device_config;
1058 
1059 	return fn(ti, config->owned_device, 0,
1060 		  config->physical_blocks * VDO_SECTORS_PER_BLOCK, data);
1061 }
1062 
1063 /*
1064  * Status line is:
1065  *    <device> <operating mode> <in recovery> <index state> <compression state>
1066  *    <used physical blocks> <total physical blocks>
1067  */
1068 
1069 static void vdo_status(struct dm_target *ti, status_type_t status_type,
1070 		       unsigned int status_flags, char *result, unsigned int maxlen)
1071 {
1072 	struct vdo *vdo = get_vdo_for_target(ti);
1073 	struct vdo_statistics *stats;
1074 	struct device_config *device_config;
1075 	/* N.B.: The DMEMIT macro uses the variables named "sz", "result", "maxlen". */
1076 	int sz = 0;
1077 
1078 	switch (status_type) {
1079 	case STATUSTYPE_INFO:
1080 		/* Report info for dmsetup status */
1081 		mutex_lock(&vdo->stats_mutex);
1082 		vdo_fetch_statistics(vdo, &vdo->stats_buffer);
1083 		stats = &vdo->stats_buffer;
1084 
1085 		DMEMIT("/dev/%pg %s %s %s %s %llu %llu",
1086 		       vdo_get_backing_device(vdo), stats->mode,
1087 		       stats->in_recovery_mode ? "recovering" : "-",
1088 		       vdo_get_dedupe_index_state_name(vdo->hash_zones),
1089 		       vdo_get_compressing(vdo) ? "online" : "offline",
1090 		       stats->data_blocks_used + stats->overhead_blocks_used,
1091 		       stats->physical_blocks);
1092 		mutex_unlock(&vdo->stats_mutex);
1093 		break;
1094 
1095 	case STATUSTYPE_TABLE:
1096 		/* Report the string actually specified in the beginning. */
1097 		device_config = (struct device_config *) ti->private;
1098 		DMEMIT("%s", device_config->original_string);
1099 		break;
1100 
1101 	case STATUSTYPE_IMA:
1102 		/* FIXME: We ought to be more detailed here, but this is what thin does. */
1103 		*result = '\0';
1104 		break;
1105 	}
1106 }
1107 
1108 static block_count_t __must_check get_underlying_device_block_count(const struct vdo *vdo)
1109 {
1110 	return bdev_nr_bytes(vdo_get_backing_device(vdo)) / VDO_BLOCK_SIZE;
1111 }
1112 
1113 static int __must_check process_vdo_message_locked(struct vdo *vdo, unsigned int argc,
1114 						   char **argv)
1115 {
1116 	if ((argc == 2) && (strcasecmp(argv[0], "compression") == 0)) {
1117 		if (strcasecmp(argv[1], "on") == 0) {
1118 			vdo_set_compressing(vdo, true);
1119 			return 0;
1120 		}
1121 
1122 		if (strcasecmp(argv[1], "off") == 0) {
1123 			vdo_set_compressing(vdo, false);
1124 			return 0;
1125 		}
1126 
1127 		vdo_log_warning("invalid argument '%s' to dmsetup compression message",
1128 				argv[1]);
1129 		return -EINVAL;
1130 	}
1131 
1132 	vdo_log_warning("unrecognized dmsetup message '%s' received", argv[0]);
1133 	return -EINVAL;
1134 }
1135 
1136 /*
1137  * If the message is a dump, just do it. Otherwise, check that no other message is being processed,
1138  * and only proceed if so.
1139  * Returns -EBUSY if another message is being processed
1140  */
1141 static int __must_check process_vdo_message(struct vdo *vdo, unsigned int argc,
1142 					    char **argv)
1143 {
1144 	int result;
1145 
1146 	/*
1147 	 * All messages which may be processed in parallel with other messages should be handled
1148 	 * here before the atomic check below. Messages which should be exclusive should be
1149 	 * processed in process_vdo_message_locked().
1150 	 */
1151 
1152 	/* Dump messages should always be processed */
1153 	if (strcasecmp(argv[0], "dump") == 0)
1154 		return vdo_dump(vdo, argc, argv, "dmsetup message");
1155 
1156 	if (argc == 1) {
1157 		if (strcasecmp(argv[0], "dump-on-shutdown") == 0) {
1158 			vdo->dump_on_shutdown = true;
1159 			return 0;
1160 		}
1161 
1162 		/* Index messages should always be processed */
1163 		if ((strcasecmp(argv[0], "index-close") == 0) ||
1164 		    (strcasecmp(argv[0], "index-create") == 0) ||
1165 		    (strcasecmp(argv[0], "index-disable") == 0) ||
1166 		    (strcasecmp(argv[0], "index-enable") == 0))
1167 			return vdo_message_dedupe_index(vdo->hash_zones, argv[0]);
1168 	}
1169 
1170 	if (atomic_cmpxchg(&vdo->processing_message, 0, 1) != 0)
1171 		return -EBUSY;
1172 
1173 	result = process_vdo_message_locked(vdo, argc, argv);
1174 
1175 	/* Pairs with the implicit barrier in cmpxchg just above */
1176 	smp_wmb();
1177 	atomic_set(&vdo->processing_message, 0);
1178 	return result;
1179 }
1180 
1181 static int vdo_message(struct dm_target *ti, unsigned int argc, char **argv,
1182 		       char *result_buffer, unsigned int maxlen)
1183 {
1184 	struct registered_thread allocating_thread, instance_thread;
1185 	struct vdo *vdo;
1186 	int result;
1187 
1188 	if (argc == 0) {
1189 		vdo_log_warning("unspecified dmsetup message");
1190 		return -EINVAL;
1191 	}
1192 
1193 	vdo = get_vdo_for_target(ti);
1194 	vdo_register_allocating_thread(&allocating_thread, NULL);
1195 	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
1196 
1197 	/*
1198 	 * Must be done here so we don't map return codes. The code in dm-ioctl expects a 1 for a
1199 	 * return code to look at the buffer and see if it is full or not.
1200 	 */
1201 	if ((argc == 1) && (strcasecmp(argv[0], "stats") == 0)) {
1202 		vdo_write_stats(vdo, result_buffer, maxlen);
1203 		result = 1;
1204 	} else if ((argc == 1) && (strcasecmp(argv[0], "config") == 0)) {
1205 		vdo_write_config(vdo, &result_buffer, &maxlen);
1206 		result = 1;
1207 	} else {
1208 		result = vdo_status_to_errno(process_vdo_message(vdo, argc, argv));
1209 	}
1210 
1211 	vdo_unregister_thread_device_id();
1212 	vdo_unregister_allocating_thread();
1213 	return result;
1214 }
1215 
1216 static void configure_target_capabilities(struct dm_target *ti)
1217 {
1218 	ti->discards_supported = 1;
1219 	ti->flush_supported = true;
1220 	ti->num_discard_bios = 1;
1221 	ti->num_flush_bios = 1;
1222 
1223 	/*
1224 	 * If this value changes, please make sure to update the value for max_discard_sectors
1225 	 * accordingly.
1226 	 */
1227 	BUG_ON(dm_set_target_max_io_len(ti, VDO_SECTORS_PER_BLOCK) != 0);
1228 }
1229 
1230 /*
1231  * Implements vdo_filter_fn.
1232  */
1233 static bool vdo_uses_device(struct vdo *vdo, const void *context)
1234 {
1235 	const struct device_config *config = context;
1236 
1237 	return vdo_get_backing_device(vdo)->bd_dev == config->owned_device->bdev->bd_dev;
1238 }
1239 
1240 /**
1241  * get_thread_id_for_phase() - Get the thread id for the current phase of the admin operation in
1242  *                             progress.
1243  * @vdo: The vdo.
1244  */
1245 static thread_id_t __must_check get_thread_id_for_phase(struct vdo *vdo)
1246 {
1247 	switch (vdo->admin.phase) {
1248 	case RESUME_PHASE_PACKER:
1249 	case RESUME_PHASE_FLUSHER:
1250 	case SUSPEND_PHASE_PACKER:
1251 	case SUSPEND_PHASE_FLUSHES:
1252 		return vdo->thread_config.packer_thread;
1253 
1254 	case RESUME_PHASE_DATA_VIOS:
1255 	case SUSPEND_PHASE_DATA_VIOS:
1256 		return vdo->thread_config.cpu_thread;
1257 
1258 	case LOAD_PHASE_DRAIN_JOURNAL:
1259 	case RESUME_PHASE_JOURNAL:
1260 	case SUSPEND_PHASE_JOURNAL:
1261 		return vdo->thread_config.journal_thread;
1262 
1263 	default:
1264 		return vdo->thread_config.admin_thread;
1265 	}
1266 }
1267 
1268 static struct vdo_completion *prepare_admin_completion(struct vdo *vdo,
1269 						       vdo_action_fn callback,
1270 						       vdo_action_fn error_handler)
1271 {
1272 	struct vdo_completion *completion = &vdo->admin.completion;
1273 
1274 	/*
1275 	 * We can't use vdo_prepare_completion_for_requeue() here because we don't want to reset
1276 	 * any error in the completion.
1277 	 */
1278 	completion->callback = callback;
1279 	completion->error_handler = error_handler;
1280 	completion->callback_thread_id = get_thread_id_for_phase(vdo);
1281 	completion->requeue = true;
1282 	return completion;
1283 }
1284 
1285 /**
1286  * advance_phase() - Increment the phase of the current admin operation and prepare the admin
1287  *                   completion to run on the thread for the next phase.
1288  * @vdo: The vdo on which an admin operation is being performed.
1289  *
1290  * Return: The current phase.
1291  */
1292 static u32 advance_phase(struct vdo *vdo)
1293 {
1294 	u32 phase = vdo->admin.phase++;
1295 
1296 	vdo->admin.completion.callback_thread_id = get_thread_id_for_phase(vdo);
1297 	vdo->admin.completion.requeue = true;
1298 	return phase;
1299 }
1300 
1301 /*
1302  * Perform an administrative operation (load, suspend, grow logical, or grow physical). This method
1303  * should not be called from vdo threads.
1304  */
1305 static int perform_admin_operation(struct vdo *vdo, u32 starting_phase,
1306 				   vdo_action_fn callback, vdo_action_fn error_handler,
1307 				   const char *type)
1308 {
1309 	int result;
1310 	struct vdo_administrator *admin = &vdo->admin;
1311 
1312 	if (atomic_cmpxchg(&admin->busy, 0, 1) != 0) {
1313 		return vdo_log_error_strerror(VDO_COMPONENT_BUSY,
1314 					      "Can't start %s operation, another operation is already in progress",
1315 					      type);
1316 	}
1317 
1318 	admin->phase = starting_phase;
1319 	reinit_completion(&admin->callback_sync);
1320 	vdo_reset_completion(&admin->completion);
1321 	vdo_launch_completion(prepare_admin_completion(vdo, callback, error_handler));
1322 
1323 	/*
1324 	 * Using the "interruptible" interface means that Linux will not log a message when we wait
1325 	 * for more than 120 seconds.
1326 	 */
1327 	while (wait_for_completion_interruptible(&admin->callback_sync)) {
1328 		/* However, if we get a signal in a user-mode process, we could spin... */
1329 		fsleep(1000);
1330 	}
1331 
1332 	result = admin->completion.result;
1333 	/* pairs with implicit barrier in cmpxchg above */
1334 	smp_wmb();
1335 	atomic_set(&admin->busy, 0);
1336 	return result;
1337 }
1338 
1339 /* Assert that we are operating on the correct thread for the current phase. */
1340 static void assert_admin_phase_thread(struct vdo *vdo, const char *what)
1341 {
1342 	VDO_ASSERT_LOG_ONLY(vdo_get_callback_thread_id() == get_thread_id_for_phase(vdo),
1343 			    "%s on correct thread for %s", what,
1344 			    ADMIN_PHASE_NAMES[vdo->admin.phase]);
1345 }
1346 
1347 /**
1348  * finish_operation_callback() - Callback to finish an admin operation.
1349  * @completion: The admin_completion.
1350  */
1351 static void finish_operation_callback(struct vdo_completion *completion)
1352 {
1353 	struct vdo_administrator *admin = &completion->vdo->admin;
1354 
1355 	vdo_finish_operation(&admin->state, completion->result);
1356 	complete(&admin->callback_sync);
1357 }
1358 
1359 /**
1360  * decode_from_super_block() - Decode the VDO state from the super block and validate that it is
1361  *                             correct.
1362  * @vdo: The vdo being loaded.
1363  *
1364  * On error from this method, the component states must be destroyed explicitly. If this method
1365  * returns successfully, the component states must not be destroyed.
1366  *
1367  * Return: VDO_SUCCESS or an error.
1368  */
1369 static int __must_check decode_from_super_block(struct vdo *vdo)
1370 {
1371 	const struct device_config *config = vdo->device_config;
1372 	int result;
1373 
1374 	result = vdo_decode_component_states(vdo->super_block.buffer, &vdo->geometry,
1375 					     &vdo->states);
1376 	if (result != VDO_SUCCESS)
1377 		return result;
1378 
1379 	vdo_set_state(vdo, vdo->states.vdo.state);
1380 	vdo->load_state = vdo->states.vdo.state;
1381 
1382 	/*
1383 	 * If the device config specifies a larger logical size than was recorded in the super
1384 	 * block, just accept it.
1385 	 */
1386 	if (vdo->states.vdo.config.logical_blocks < config->logical_blocks) {
1387 		vdo_log_warning("Growing logical size: a logical size of %llu blocks was specified, but that differs from the %llu blocks configured in the vdo super block",
1388 				(unsigned long long) config->logical_blocks,
1389 				(unsigned long long) vdo->states.vdo.config.logical_blocks);
1390 		vdo->states.vdo.config.logical_blocks = config->logical_blocks;
1391 	}
1392 
1393 	result = vdo_validate_component_states(&vdo->states, vdo->geometry.nonce,
1394 					       config->physical_blocks,
1395 					       config->logical_blocks);
1396 	if (result != VDO_SUCCESS)
1397 		return result;
1398 
1399 	vdo->layout = vdo->states.layout;
1400 	return VDO_SUCCESS;
1401 }
1402 
1403 /**
1404  * decode_vdo() - Decode the component data portion of a super block and fill in the corresponding
1405  *                portions of the vdo being loaded.
1406  * @vdo: The vdo being loaded.
1407  *
1408  * This will also allocate the recovery journal and slab depot. If this method is called with an
1409  * asynchronous layer (i.e. a thread config which specifies at least one base thread), the block
1410  * map and packer will be constructed as well.
1411  *
1412  * Return: VDO_SUCCESS or an error.
1413  */
1414 static int __must_check decode_vdo(struct vdo *vdo)
1415 {
1416 	block_count_t maximum_age, journal_length;
1417 	struct partition *partition;
1418 	int result;
1419 
1420 	result = decode_from_super_block(vdo);
1421 	if (result != VDO_SUCCESS) {
1422 		vdo_destroy_component_states(&vdo->states);
1423 		return result;
1424 	}
1425 
1426 	maximum_age = vdo_convert_maximum_age(vdo->device_config->block_map_maximum_age);
1427 	journal_length =
1428 		vdo_get_recovery_journal_length(vdo->states.vdo.config.recovery_journal_size);
1429 	if (maximum_age > (journal_length / 2)) {
1430 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
1431 					      "maximum age: %llu exceeds limit %llu",
1432 					      (unsigned long long) maximum_age,
1433 					      (unsigned long long) (journal_length / 2));
1434 	}
1435 
1436 	if (maximum_age == 0) {
1437 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
1438 					      "maximum age must be greater than 0");
1439 	}
1440 
1441 	result = vdo_enable_read_only_entry(vdo);
1442 	if (result != VDO_SUCCESS)
1443 		return result;
1444 
1445 	partition = vdo_get_known_partition(&vdo->layout,
1446 					    VDO_RECOVERY_JOURNAL_PARTITION);
1447 	result = vdo_decode_recovery_journal(vdo->states.recovery_journal,
1448 					     vdo->states.vdo.nonce, vdo, partition,
1449 					     vdo->states.vdo.complete_recoveries,
1450 					     vdo->states.vdo.config.recovery_journal_size,
1451 					     &vdo->recovery_journal);
1452 	if (result != VDO_SUCCESS)
1453 		return result;
1454 
1455 	partition = vdo_get_known_partition(&vdo->layout, VDO_SLAB_SUMMARY_PARTITION);
1456 	result = vdo_decode_slab_depot(vdo->states.slab_depot, vdo, partition,
1457 				       &vdo->depot);
1458 	if (result != VDO_SUCCESS)
1459 		return result;
1460 
1461 	result = vdo_decode_block_map(vdo->states.block_map,
1462 				      vdo->states.vdo.config.logical_blocks, vdo,
1463 				      vdo->recovery_journal, vdo->states.vdo.nonce,
1464 				      vdo->device_config->cache_size, maximum_age,
1465 				      &vdo->block_map);
1466 	if (result != VDO_SUCCESS)
1467 		return result;
1468 
1469 	result = vdo_make_physical_zones(vdo, &vdo->physical_zones);
1470 	if (result != VDO_SUCCESS)
1471 		return result;
1472 
1473 	/* The logical zones depend on the physical zones already existing. */
1474 	result = vdo_make_logical_zones(vdo, &vdo->logical_zones);
1475 	if (result != VDO_SUCCESS)
1476 		return result;
1477 
1478 	return vdo_make_hash_zones(vdo, &vdo->hash_zones);
1479 }
1480 
1481 /**
1482  * pre_load_callback() - Callback to initiate a pre-load, registered in vdo_initialize().
1483  * @completion: The admin completion.
1484  */
1485 static void pre_load_callback(struct vdo_completion *completion)
1486 {
1487 	struct vdo *vdo = completion->vdo;
1488 	int result;
1489 
1490 	assert_admin_phase_thread(vdo, __func__);
1491 
1492 	switch (advance_phase(vdo)) {
1493 	case PRE_LOAD_PHASE_START:
1494 		result = vdo_start_operation(&vdo->admin.state,
1495 					     VDO_ADMIN_STATE_PRE_LOADING);
1496 		if (result != VDO_SUCCESS) {
1497 			vdo_continue_completion(completion, result);
1498 			return;
1499 		}
1500 		if (vdo->needs_formatting)
1501 			vdo->admin.phase = PRE_LOAD_PHASE_FORMAT_START;
1502 		else
1503 			vdo->admin.phase = PRE_LOAD_PHASE_LOAD_SUPER;
1504 
1505 		vdo_continue_completion(completion, VDO_SUCCESS);
1506 		return;
1507 
1508 	case PRE_LOAD_PHASE_FORMAT_START:
1509 		vdo_continue_completion(completion, vdo_clear_layout(vdo));
1510 		return;
1511 
1512 	case PRE_LOAD_PHASE_FORMAT_SUPER:
1513 		vdo_save_super_block(vdo, completion);
1514 		return;
1515 
1516 	case PRE_LOAD_PHASE_FORMAT_GEOMETRY:
1517 		vdo_save_geometry_block(vdo, completion);
1518 		return;
1519 
1520 	case PRE_LOAD_PHASE_FORMAT_END:
1521 		/* cleanup layout before load adds to it */
1522 		vdo_uninitialize_layout(&vdo->states.layout);
1523 		vdo_continue_completion(completion, VDO_SUCCESS);
1524 		return;
1525 
1526 	case PRE_LOAD_PHASE_LOAD_SUPER:
1527 		vdo_load_super_block(vdo, completion);
1528 		return;
1529 
1530 	case PRE_LOAD_PHASE_LOAD_COMPONENTS:
1531 		vdo_continue_completion(completion, decode_vdo(vdo));
1532 		return;
1533 
1534 	case PRE_LOAD_PHASE_END:
1535 		break;
1536 
1537 	default:
1538 		vdo_set_completion_result(completion, UDS_BAD_STATE);
1539 	}
1540 
1541 	finish_operation_callback(completion);
1542 }
1543 
1544 static void release_instance(unsigned int instance)
1545 {
1546 	mutex_lock(&instances_lock);
1547 	if (instance >= instances.bit_count) {
1548 		VDO_ASSERT_LOG_ONLY(false,
1549 				    "instance number %u must be less than bit count %u",
1550 				    instance, instances.bit_count);
1551 	} else if (test_bit(instance, instances.words) == 0) {
1552 		VDO_ASSERT_LOG_ONLY(false, "instance number %u must be allocated", instance);
1553 	} else {
1554 		__clear_bit(instance, instances.words);
1555 		instances.count -= 1;
1556 	}
1557 	mutex_unlock(&instances_lock);
1558 }
1559 
1560 static void set_device_config(struct dm_target *ti, struct vdo *vdo,
1561 			      struct device_config *config)
1562 {
1563 	list_del_init(&config->config_list);
1564 	list_add_tail(&config->config_list, &vdo->device_config_list);
1565 	config->vdo = vdo;
1566 	ti->private = config;
1567 	configure_target_capabilities(ti);
1568 }
1569 
1570 static int vdo_initialize(struct dm_target *ti, unsigned int instance,
1571 			  struct device_config *config)
1572 {
1573 	struct vdo *vdo;
1574 	int result;
1575 	u64 block_size = VDO_BLOCK_SIZE;
1576 	u64 logical_size = to_bytes(ti->len);
1577 	block_count_t logical_blocks = logical_size / block_size;
1578 
1579 	vdo_log_info("loading device '%s'", vdo_get_device_name(ti));
1580 	vdo_log_debug("Logical block size     = %llu", (u64) config->logical_block_size);
1581 	vdo_log_debug("Logical blocks         = %llu", logical_blocks);
1582 	vdo_log_debug("Physical block size    = %llu", (u64) block_size);
1583 	vdo_log_debug("Physical blocks        = %llu", config->physical_blocks);
1584 	vdo_log_debug("Slab size              = %llu", config->slab_blocks);
1585 	vdo_log_debug("Block map cache blocks = %u", config->cache_size);
1586 	vdo_log_debug("Block map maximum age  = %u", config->block_map_maximum_age);
1587 	vdo_log_debug("Deduplication          = %s", (config->deduplication ? "on" : "off"));
1588 	vdo_log_debug("Compression            = %s", (config->compression ? "on" : "off"));
1589 	vdo_log_debug("Index memory           = %u", config->index_memory);
1590 	vdo_log_debug("Index sparse           = %s", (config->index_sparse ? "on" : "off"));
1591 
1592 	vdo = vdo_find_matching(vdo_uses_device, config);
1593 	if (vdo != NULL) {
1594 		vdo_log_error("Existing vdo already uses device %s",
1595 			      vdo->device_config->parent_device_name);
1596 		ti->error = "Cannot share storage device with already-running VDO";
1597 		return VDO_BAD_CONFIGURATION;
1598 	}
1599 
1600 	result = vdo_make(instance, config, &ti->error, &vdo);
1601 	if (result != VDO_SUCCESS) {
1602 		vdo_log_error("Could not create VDO device. (VDO error %d, message %s)",
1603 			      result, ti->error);
1604 		vdo_destroy(vdo);
1605 		return result;
1606 	}
1607 
1608 	result = perform_admin_operation(vdo, PRE_LOAD_PHASE_START, pre_load_callback,
1609 					 finish_operation_callback, "pre-load");
1610 	if (result != VDO_SUCCESS) {
1611 		ti->error = ((result == VDO_INVALID_ADMIN_STATE) ?
1612 			     "Pre-load is only valid immediately after initialization" :
1613 			     "Cannot load metadata from device");
1614 		vdo_log_error("Could not start VDO device. (VDO error %d, message %s)",
1615 			      result, ti->error);
1616 		vdo_destroy(vdo);
1617 		return result;
1618 	}
1619 
1620 	set_device_config(ti, vdo, config);
1621 	vdo->device_config = config;
1622 	return VDO_SUCCESS;
1623 }
1624 
1625 /* Implements vdo_filter_fn. */
1626 static bool __must_check vdo_is_named(struct vdo *vdo, const void *context)
1627 {
1628 	struct dm_target *ti = vdo->device_config->owning_target;
1629 	const char *device_name = vdo_get_device_name(ti);
1630 
1631 	return strcmp(device_name, context) == 0;
1632 }
1633 
1634 /**
1635  * get_bit_array_size() - Return the number of bytes needed to store a bit array of the specified
1636  *                        capacity in an array of unsigned longs.
1637  * @bit_count: The number of bits the array must hold.
1638  *
1639  * Return: the number of bytes needed for the array representation.
1640  */
1641 static size_t get_bit_array_size(unsigned int bit_count)
1642 {
1643 	/* Round up to a multiple of the word size and convert to a byte count. */
1644 	return (BITS_TO_LONGS(bit_count) * sizeof(unsigned long));
1645 }
1646 
1647 /**
1648  * grow_bit_array() - Re-allocate the bitmap word array so there will more instance numbers that
1649  *                    can be allocated.
1650  *
1651  * Since the array is initially NULL, this also initializes the array the first time we allocate an
1652  * instance number.
1653  *
1654  * Return: VDO_SUCCESS or an error code from the allocation
1655  */
1656 static int grow_bit_array(void)
1657 {
1658 	unsigned int new_count = max(instances.bit_count + BIT_COUNT_INCREMENT,
1659 				     (unsigned int) BIT_COUNT_MINIMUM);
1660 	unsigned long *new_words;
1661 	int result;
1662 
1663 	result = vdo_reallocate_memory(instances.words,
1664 				       get_bit_array_size(instances.bit_count),
1665 				       get_bit_array_size(new_count),
1666 				       "instance number bit array", &new_words);
1667 	if (result != VDO_SUCCESS)
1668 		return result;
1669 
1670 	instances.bit_count = new_count;
1671 	instances.words = new_words;
1672 	return VDO_SUCCESS;
1673 }
1674 
1675 /**
1676  * allocate_instance() - Allocate an instance number.
1677  * @instance_ptr: A point to hold the instance number
1678  *
1679  * Return: VDO_SUCCESS or an error code
1680  *
1681  * This function must be called while holding the instances lock.
1682  */
1683 static int allocate_instance(unsigned int *instance_ptr)
1684 {
1685 	unsigned int instance;
1686 	int result;
1687 
1688 	/* If there are no unallocated instances, grow the bit array. */
1689 	if (instances.count >= instances.bit_count) {
1690 		result = grow_bit_array();
1691 		if (result != VDO_SUCCESS)
1692 			return result;
1693 	}
1694 
1695 	/*
1696 	 * There must be a zero bit somewhere now. Find it, starting just after the last instance
1697 	 * allocated.
1698 	 */
1699 	instance = find_next_zero_bit(instances.words, instances.bit_count,
1700 				      instances.next);
1701 	if (instance >= instances.bit_count) {
1702 		/* Nothing free after next, so wrap around to instance zero. */
1703 		instance = find_first_zero_bit(instances.words, instances.bit_count);
1704 		result = VDO_ASSERT(instance < instances.bit_count,
1705 				    "impossibly, no zero bit found");
1706 		if (result != VDO_SUCCESS)
1707 			return result;
1708 	}
1709 
1710 	__set_bit(instance, instances.words);
1711 	instances.count++;
1712 	instances.next = instance + 1;
1713 	*instance_ptr = instance;
1714 	return VDO_SUCCESS;
1715 }
1716 
1717 static int construct_new_vdo_registered(struct dm_target *ti, unsigned int argc,
1718 					char **argv, unsigned int instance)
1719 {
1720 	int result;
1721 	struct device_config *config;
1722 
1723 	result = parse_device_config(argc, argv, ti, &config);
1724 	if (result != VDO_SUCCESS) {
1725 		vdo_log_error_strerror(result, "parsing failed: %s", ti->error);
1726 		release_instance(instance);
1727 		return -EINVAL;
1728 	}
1729 
1730 	/* Beyond this point, the instance number will be cleaned up for us if needed */
1731 	result = vdo_initialize(ti, instance, config);
1732 	if (result != VDO_SUCCESS) {
1733 		release_instance(instance);
1734 		free_device_config(config);
1735 		return vdo_status_to_errno(result);
1736 	}
1737 
1738 	return VDO_SUCCESS;
1739 }
1740 
1741 static int construct_new_vdo(struct dm_target *ti, unsigned int argc, char **argv)
1742 {
1743 	int result;
1744 	unsigned int instance;
1745 	struct registered_thread instance_thread;
1746 
1747 	mutex_lock(&instances_lock);
1748 	result = allocate_instance(&instance);
1749 	mutex_unlock(&instances_lock);
1750 	if (result != VDO_SUCCESS)
1751 		return -ENOMEM;
1752 
1753 	vdo_register_thread_device_id(&instance_thread, &instance);
1754 	result = construct_new_vdo_registered(ti, argc, argv, instance);
1755 	vdo_unregister_thread_device_id();
1756 	return result;
1757 }
1758 
1759 /**
1760  * check_may_grow_physical() - Callback to check that we're not in recovery mode, used in
1761  *                             vdo_prepare_to_grow_physical().
1762  * @completion: The admin completion.
1763  */
1764 static void check_may_grow_physical(struct vdo_completion *completion)
1765 {
1766 	struct vdo *vdo = completion->vdo;
1767 
1768 	assert_admin_phase_thread(vdo, __func__);
1769 
1770 	/* These checks can only be done from a vdo thread. */
1771 	if (vdo_is_read_only(vdo))
1772 		vdo_set_completion_result(completion, VDO_READ_ONLY);
1773 
1774 	if (vdo_in_recovery_mode(vdo))
1775 		vdo_set_completion_result(completion, VDO_RETRY_AFTER_REBUILD);
1776 
1777 	finish_operation_callback(completion);
1778 }
1779 
1780 static block_count_t get_partition_size(struct layout *layout, enum partition_id id)
1781 {
1782 	return vdo_get_known_partition(layout, id)->count;
1783 }
1784 
1785 /**
1786  * grow_layout() - Make the layout for growing a vdo.
1787  * @vdo: The vdo preparing to grow.
1788  * @old_size: The current size of the vdo.
1789  * @new_size: The size to which the vdo will be grown.
1790  *
1791  * Return: VDO_SUCCESS or an error code.
1792  */
1793 static int grow_layout(struct vdo *vdo, block_count_t old_size, block_count_t new_size)
1794 {
1795 	int result;
1796 	block_count_t min_new_size;
1797 
1798 	if (vdo->next_layout.size == new_size) {
1799 		/* We are already prepared to grow to the new size, so we're done. */
1800 		return VDO_SUCCESS;
1801 	}
1802 
1803 	/* Make a copy completion if there isn't one */
1804 	if (vdo->partition_copier == NULL) {
1805 		vdo->partition_copier = dm_kcopyd_client_create(NULL);
1806 		if (IS_ERR(vdo->partition_copier)) {
1807 			result = PTR_ERR(vdo->partition_copier);
1808 			vdo->partition_copier = NULL;
1809 			return result;
1810 		}
1811 	}
1812 
1813 	/* Free any unused preparation. */
1814 	vdo_uninitialize_layout(&vdo->next_layout);
1815 
1816 	/*
1817 	 * Make a new layout with the existing partition sizes for everything but the slab depot
1818 	 * partition.
1819 	 */
1820 	result = vdo_initialize_layout(new_size, vdo->layout.start,
1821 				       get_partition_size(&vdo->layout,
1822 							  VDO_BLOCK_MAP_PARTITION),
1823 				       get_partition_size(&vdo->layout,
1824 							  VDO_RECOVERY_JOURNAL_PARTITION),
1825 				       get_partition_size(&vdo->layout,
1826 							  VDO_SLAB_SUMMARY_PARTITION),
1827 				       &vdo->next_layout);
1828 	if (result != VDO_SUCCESS) {
1829 		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
1830 		return result;
1831 	}
1832 
1833 	/* Ensure the new journal and summary are entirely within the added blocks. */
1834 	min_new_size = (old_size +
1835 			get_partition_size(&vdo->next_layout,
1836 					   VDO_SLAB_SUMMARY_PARTITION) +
1837 			get_partition_size(&vdo->next_layout,
1838 					   VDO_RECOVERY_JOURNAL_PARTITION));
1839 	if (min_new_size > new_size) {
1840 		/* Copying the journal and summary would destroy some old metadata. */
1841 		vdo_uninitialize_layout(&vdo->next_layout);
1842 		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
1843 		return VDO_INCREMENT_TOO_SMALL;
1844 	}
1845 
1846 	return VDO_SUCCESS;
1847 }
1848 
1849 static int prepare_to_grow_physical(struct vdo *vdo, block_count_t new_physical_blocks)
1850 {
1851 	int result;
1852 	block_count_t current_physical_blocks = vdo->states.vdo.config.physical_blocks;
1853 
1854 	vdo_log_info("Preparing to resize physical to %llu",
1855 		     (unsigned long long) new_physical_blocks);
1856 	VDO_ASSERT_LOG_ONLY((new_physical_blocks > current_physical_blocks),
1857 			    "New physical size is larger than current physical size");
1858 	result = perform_admin_operation(vdo, PREPARE_GROW_PHYSICAL_PHASE_START,
1859 					 check_may_grow_physical,
1860 					 finish_operation_callback,
1861 					 "prepare grow-physical");
1862 	if (result != VDO_SUCCESS)
1863 		return result;
1864 
1865 	result = grow_layout(vdo, current_physical_blocks, new_physical_blocks);
1866 	if (result != VDO_SUCCESS)
1867 		return result;
1868 
1869 	result = vdo_prepare_to_grow_slab_depot(vdo->depot,
1870 						vdo_get_known_partition(&vdo->next_layout,
1871 									VDO_SLAB_DEPOT_PARTITION));
1872 	if (result != VDO_SUCCESS) {
1873 		vdo_uninitialize_layout(&vdo->next_layout);
1874 		return result;
1875 	}
1876 
1877 	vdo_log_info("Done preparing to resize physical");
1878 	return VDO_SUCCESS;
1879 }
1880 
1881 /**
1882  * validate_new_device_config() - Check whether a new device config represents a valid modification
1883  *				  to an existing config.
1884  * @to_validate: The new config to validate.
1885  * @config: The existing config.
1886  * @may_grow: Set to true if growing the logical and physical size of the vdo is currently
1887  *	      permitted.
1888  * @error_ptr: A pointer to hold the reason for any error.
1889  *
1890  * Return: VDO_SUCCESS or an error.
1891  */
1892 static int validate_new_device_config(struct device_config *to_validate,
1893 				      struct device_config *config, bool may_grow,
1894 				      char **error_ptr)
1895 {
1896 	if (to_validate->owning_target->begin != config->owning_target->begin) {
1897 		*error_ptr = "Starting sector cannot change";
1898 		return VDO_PARAMETER_MISMATCH;
1899 	}
1900 
1901 	if (to_validate->logical_block_size != config->logical_block_size) {
1902 		*error_ptr = "Logical block size cannot change";
1903 		return VDO_PARAMETER_MISMATCH;
1904 	}
1905 
1906 	if (to_validate->logical_blocks < config->logical_blocks) {
1907 		*error_ptr = "Can't shrink VDO logical size";
1908 		return VDO_PARAMETER_MISMATCH;
1909 	}
1910 
1911 	if (to_validate->cache_size != config->cache_size) {
1912 		*error_ptr = "Block map cache size cannot change";
1913 		return VDO_PARAMETER_MISMATCH;
1914 	}
1915 
1916 	if (to_validate->block_map_maximum_age != config->block_map_maximum_age) {
1917 		*error_ptr = "Block map maximum age cannot change";
1918 		return VDO_PARAMETER_MISMATCH;
1919 	}
1920 
1921 	if (memcmp(&to_validate->thread_counts, &config->thread_counts,
1922 		   sizeof(struct thread_count_config)) != 0) {
1923 		*error_ptr = "Thread configuration cannot change";
1924 		return VDO_PARAMETER_MISMATCH;
1925 	}
1926 
1927 	if (to_validate->physical_blocks < config->physical_blocks) {
1928 		*error_ptr = "Removing physical storage from a VDO is not supported";
1929 		return VDO_NOT_IMPLEMENTED;
1930 	}
1931 
1932 	if (!may_grow && (to_validate->physical_blocks > config->physical_blocks)) {
1933 		*error_ptr = "VDO physical size may not grow in current state";
1934 		return VDO_NOT_IMPLEMENTED;
1935 	}
1936 
1937 	return VDO_SUCCESS;
1938 }
1939 
1940 static int prepare_to_modify(struct dm_target *ti, struct device_config *config,
1941 			     struct vdo *vdo)
1942 {
1943 	int result;
1944 	bool may_grow = (vdo_get_admin_state(vdo) != VDO_ADMIN_STATE_PRE_LOADED);
1945 
1946 	result = validate_new_device_config(config, vdo->device_config, may_grow,
1947 					    &ti->error);
1948 	if (result != VDO_SUCCESS)
1949 		return -EINVAL;
1950 
1951 	if (config->logical_blocks > vdo->device_config->logical_blocks) {
1952 		block_count_t logical_blocks = vdo->states.vdo.config.logical_blocks;
1953 
1954 		vdo_log_info("Preparing to resize logical to %llu",
1955 			     (unsigned long long) config->logical_blocks);
1956 		VDO_ASSERT_LOG_ONLY((config->logical_blocks > logical_blocks),
1957 				    "New logical size is larger than current size");
1958 
1959 		result = vdo_prepare_to_grow_block_map(vdo->block_map,
1960 						       config->logical_blocks);
1961 		if (result != VDO_SUCCESS) {
1962 			ti->error = "Device vdo_prepare_to_grow_logical failed";
1963 			return result;
1964 		}
1965 
1966 		vdo_log_info("Done preparing to resize logical");
1967 	}
1968 
1969 	if (config->physical_blocks > vdo->device_config->physical_blocks) {
1970 		result = prepare_to_grow_physical(vdo, config->physical_blocks);
1971 		if (result != VDO_SUCCESS) {
1972 			if (result == VDO_PARAMETER_MISMATCH) {
1973 				/*
1974 				 * If we don't trap this case, vdo_status_to_errno() will remap
1975 				 * it to -EIO, which is misleading and ahistorical.
1976 				 */
1977 				result = -EINVAL;
1978 			}
1979 
1980 			if (result == VDO_TOO_MANY_SLABS)
1981 				ti->error = "Device vdo_prepare_to_grow_physical failed (specified physical size too big based on formatted slab size)";
1982 			else
1983 				ti->error = "Device vdo_prepare_to_grow_physical failed";
1984 
1985 			return result;
1986 		}
1987 	}
1988 
1989 	if (strcmp(config->parent_device_name, vdo->device_config->parent_device_name) != 0) {
1990 		const char *device_name = vdo_get_device_name(config->owning_target);
1991 
1992 		vdo_log_info("Updating backing device of %s from %s to %s", device_name,
1993 			     vdo->device_config->parent_device_name,
1994 			     config->parent_device_name);
1995 	}
1996 
1997 	return VDO_SUCCESS;
1998 }
1999 
2000 static int update_existing_vdo(const char *device_name, struct dm_target *ti,
2001 			       unsigned int argc, char **argv, struct vdo *vdo)
2002 {
2003 	int result;
2004 	struct device_config *config;
2005 
2006 	result = parse_device_config(argc, argv, ti, &config);
2007 	if (result != VDO_SUCCESS)
2008 		return -EINVAL;
2009 
2010 	vdo_log_info("preparing to modify device '%s'", device_name);
2011 	result = prepare_to_modify(ti, config, vdo);
2012 	if (result != VDO_SUCCESS) {
2013 		free_device_config(config);
2014 		return vdo_status_to_errno(result);
2015 	}
2016 
2017 	set_device_config(ti, vdo, config);
2018 	return VDO_SUCCESS;
2019 }
2020 
2021 static int vdo_ctr(struct dm_target *ti, unsigned int argc, char **argv)
2022 {
2023 	int result;
2024 	struct registered_thread allocating_thread, instance_thread;
2025 	const char *device_name;
2026 	struct vdo *vdo;
2027 
2028 	vdo_register_allocating_thread(&allocating_thread, NULL);
2029 	device_name = vdo_get_device_name(ti);
2030 	vdo = vdo_find_matching(vdo_is_named, device_name);
2031 	if (vdo == NULL) {
2032 		result = construct_new_vdo(ti, argc, argv);
2033 	} else {
2034 		vdo_register_thread_device_id(&instance_thread, &vdo->instance);
2035 		result = update_existing_vdo(device_name, ti, argc, argv, vdo);
2036 		vdo_unregister_thread_device_id();
2037 	}
2038 
2039 	vdo_unregister_allocating_thread();
2040 	return result;
2041 }
2042 
2043 static void vdo_dtr(struct dm_target *ti)
2044 {
2045 	struct device_config *config = ti->private;
2046 	struct vdo *vdo = vdo_forget(config->vdo);
2047 
2048 	list_del_init(&config->config_list);
2049 	if (list_empty(&vdo->device_config_list)) {
2050 		const char *device_name;
2051 
2052 		/* This was the last config referencing the VDO. Free it. */
2053 		unsigned int instance = vdo->instance;
2054 		struct registered_thread allocating_thread, instance_thread;
2055 
2056 		vdo_register_thread_device_id(&instance_thread, &instance);
2057 		vdo_register_allocating_thread(&allocating_thread, NULL);
2058 
2059 		device_name = vdo_get_device_name(ti);
2060 		vdo_log_info("stopping device '%s'", device_name);
2061 		if (vdo->dump_on_shutdown)
2062 			vdo_dump_all(vdo, "device shutdown");
2063 
2064 		vdo_destroy(vdo_forget(vdo));
2065 		vdo_log_info("device '%s' stopped", device_name);
2066 		vdo_unregister_thread_device_id();
2067 		vdo_unregister_allocating_thread();
2068 		release_instance(instance);
2069 	} else if (config == vdo->device_config) {
2070 		/*
2071 		 * The VDO still references this config. Give it a reference to a config that isn't
2072 		 * being destroyed.
2073 		 */
2074 		vdo->device_config = list_first_entry(&vdo->device_config_list,
2075 						      struct device_config, config_list);
2076 	}
2077 
2078 	free_device_config(config);
2079 	ti->private = NULL;
2080 }
2081 
2082 static void vdo_presuspend(struct dm_target *ti)
2083 {
2084 	get_vdo_for_target(ti)->suspend_type =
2085 		(dm_noflush_suspending(ti) ? VDO_ADMIN_STATE_SUSPENDING : VDO_ADMIN_STATE_SAVING);
2086 }
2087 
2088 /**
2089  * write_super_block_for_suspend() - Update the VDO state and save the super block.
2090  * @completion: The admin completion
2091  */
2092 static void write_super_block_for_suspend(struct vdo_completion *completion)
2093 {
2094 	struct vdo *vdo = completion->vdo;
2095 
2096 	switch (vdo_get_state(vdo)) {
2097 	case VDO_DIRTY:
2098 	case VDO_NEW:
2099 		vdo_set_state(vdo, VDO_CLEAN);
2100 		break;
2101 
2102 	case VDO_CLEAN:
2103 	case VDO_READ_ONLY_MODE:
2104 	case VDO_FORCE_REBUILD:
2105 	case VDO_RECOVERING:
2106 	case VDO_REBUILD_FOR_UPGRADE:
2107 		break;
2108 
2109 	case VDO_REPLAYING:
2110 	default:
2111 		vdo_continue_completion(completion, UDS_BAD_STATE);
2112 		return;
2113 	}
2114 
2115 	vdo_save_components(vdo, completion);
2116 }
2117 
2118 /**
2119  * suspend_callback() - Callback to initiate a suspend, registered in vdo_postsuspend().
2120  * @completion: The sub-task completion.
2121  */
2122 static void suspend_callback(struct vdo_completion *completion)
2123 {
2124 	struct vdo *vdo = completion->vdo;
2125 	struct admin_state *state = &vdo->admin.state;
2126 	int result;
2127 
2128 	assert_admin_phase_thread(vdo, __func__);
2129 
2130 	switch (advance_phase(vdo)) {
2131 	case SUSPEND_PHASE_START:
2132 		if (vdo_get_admin_state_code(state)->quiescent) {
2133 			/* Already suspended */
2134 			break;
2135 		}
2136 
2137 		vdo_continue_completion(completion,
2138 					vdo_start_operation(state, vdo->suspend_type));
2139 		return;
2140 
2141 	case SUSPEND_PHASE_PACKER:
2142 		/*
2143 		 * If the VDO was already resumed from a prior suspend while read-only, some of the
2144 		 * components may not have been resumed. By setting a read-only error here, we
2145 		 * guarantee that the result of this suspend will be VDO_READ_ONLY and not
2146 		 * VDO_INVALID_ADMIN_STATE in that case.
2147 		 */
2148 		if (vdo_in_read_only_mode(vdo))
2149 			vdo_set_completion_result(completion, VDO_READ_ONLY);
2150 
2151 		vdo_drain_packer(vdo->packer, completion);
2152 		return;
2153 
2154 	case SUSPEND_PHASE_DATA_VIOS:
2155 		drain_data_vio_pool(vdo->data_vio_pool, completion);
2156 		return;
2157 
2158 	case SUSPEND_PHASE_DEDUPE:
2159 		vdo_drain_hash_zones(vdo->hash_zones, completion);
2160 		return;
2161 
2162 	case SUSPEND_PHASE_FLUSHES:
2163 		vdo_drain_flusher(vdo->flusher, completion);
2164 		return;
2165 
2166 	case SUSPEND_PHASE_LOGICAL_ZONES:
2167 		/*
2168 		 * Attempt to flush all I/O before completing post suspend work. We believe a
2169 		 * suspended device is expected to have persisted all data written before the
2170 		 * suspend, even if it hasn't been flushed yet.
2171 		 */
2172 		result = vdo_synchronous_flush(vdo);
2173 		if (result != VDO_SUCCESS)
2174 			vdo_enter_read_only_mode(vdo, result);
2175 
2176 		vdo_drain_logical_zones(vdo->logical_zones,
2177 					vdo_get_admin_state_code(state), completion);
2178 		return;
2179 
2180 	case SUSPEND_PHASE_BLOCK_MAP:
2181 		vdo_drain_block_map(vdo->block_map, vdo_get_admin_state_code(state),
2182 				    completion);
2183 		return;
2184 
2185 	case SUSPEND_PHASE_JOURNAL:
2186 		vdo_drain_recovery_journal(vdo->recovery_journal,
2187 					   vdo_get_admin_state_code(state), completion);
2188 		return;
2189 
2190 	case SUSPEND_PHASE_DEPOT:
2191 		vdo_drain_slab_depot(vdo->depot, vdo_get_admin_state_code(state),
2192 				     completion);
2193 		return;
2194 
2195 	case SUSPEND_PHASE_READ_ONLY_WAIT:
2196 		vdo_wait_until_not_entering_read_only_mode(completion);
2197 		return;
2198 
2199 	case SUSPEND_PHASE_WRITE_SUPER_BLOCK:
2200 		if (vdo_is_state_suspending(state) || (completion->result != VDO_SUCCESS)) {
2201 			/* If we didn't save the VDO or there was an error, we're done. */
2202 			break;
2203 		}
2204 
2205 		write_super_block_for_suspend(completion);
2206 		return;
2207 
2208 	case SUSPEND_PHASE_END:
2209 		break;
2210 
2211 	default:
2212 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2213 	}
2214 
2215 	finish_operation_callback(completion);
2216 }
2217 
2218 static void vdo_postsuspend(struct dm_target *ti)
2219 {
2220 	struct vdo *vdo = get_vdo_for_target(ti);
2221 	struct registered_thread instance_thread;
2222 	const char *device_name;
2223 	int result;
2224 
2225 	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
2226 	device_name = vdo_get_device_name(vdo->device_config->owning_target);
2227 	vdo_log_info("suspending device '%s'", device_name);
2228 
2229 	/*
2230 	 * It's important to note any error here does not actually stop device-mapper from
2231 	 * suspending the device. All this work is done post suspend.
2232 	 */
2233 	result = perform_admin_operation(vdo, SUSPEND_PHASE_START, suspend_callback,
2234 					 suspend_callback, "suspend");
2235 
2236 	if ((result == VDO_SUCCESS) || (result == VDO_READ_ONLY)) {
2237 		/*
2238 		 * Treat VDO_READ_ONLY as a success since a read-only suspension still leaves the
2239 		 * VDO suspended.
2240 		 */
2241 		vdo_log_info("device '%s' suspended", device_name);
2242 	} else if (result == VDO_INVALID_ADMIN_STATE) {
2243 		vdo_log_error("Suspend invoked while in unexpected state: %s",
2244 			      vdo_get_admin_state(vdo)->name);
2245 	} else {
2246 		vdo_log_error_strerror(result, "Suspend of device '%s' failed",
2247 				       device_name);
2248 	}
2249 
2250 	vdo_unregister_thread_device_id();
2251 }
2252 
2253 /**
2254  * was_new() - Check whether the vdo was new when it was loaded.
2255  * @vdo: The vdo to query.
2256  *
2257  * Return: true if the vdo was new.
2258  */
2259 static bool was_new(const struct vdo *vdo)
2260 {
2261 	return (vdo->load_state == VDO_NEW);
2262 }
2263 
2264 /**
2265  * requires_repair() - Check whether a vdo requires recovery or rebuild.
2266  * @vdo: The vdo to query.
2267  *
2268  * Return: true if the vdo must be repaired.
2269  */
2270 static bool __must_check requires_repair(const struct vdo *vdo)
2271 {
2272 	switch (vdo_get_state(vdo)) {
2273 	case VDO_DIRTY:
2274 	case VDO_FORCE_REBUILD:
2275 	case VDO_REPLAYING:
2276 	case VDO_REBUILD_FOR_UPGRADE:
2277 		return true;
2278 
2279 	default:
2280 		return false;
2281 	}
2282 }
2283 
2284 /**
2285  * get_load_type() - Determine how the slab depot was loaded.
2286  * @vdo: The vdo.
2287  *
2288  * Return: How the depot was loaded.
2289  */
2290 static enum slab_depot_load_type get_load_type(struct vdo *vdo)
2291 {
2292 	if (vdo_state_requires_read_only_rebuild(vdo->load_state))
2293 		return VDO_SLAB_DEPOT_REBUILD_LOAD;
2294 
2295 	if (vdo_state_requires_recovery(vdo->load_state))
2296 		return VDO_SLAB_DEPOT_RECOVERY_LOAD;
2297 
2298 	return VDO_SLAB_DEPOT_NORMAL_LOAD;
2299 }
2300 
2301 /**
2302  * load_callback() - Callback to do the destructive parts of loading a VDO.
2303  * @completion: The sub-task completion.
2304  */
2305 static void load_callback(struct vdo_completion *completion)
2306 {
2307 	struct vdo *vdo = completion->vdo;
2308 	int result;
2309 
2310 	assert_admin_phase_thread(vdo, __func__);
2311 
2312 	switch (advance_phase(vdo)) {
2313 	case LOAD_PHASE_START:
2314 		result = vdo_start_operation(&vdo->admin.state, VDO_ADMIN_STATE_LOADING);
2315 		if (result != VDO_SUCCESS) {
2316 			vdo_continue_completion(completion, result);
2317 			return;
2318 		}
2319 
2320 		/* Prepare the recovery journal for new entries. */
2321 		vdo_open_recovery_journal(vdo->recovery_journal, vdo->depot,
2322 					  vdo->block_map);
2323 		vdo_allow_read_only_mode_entry(completion);
2324 		return;
2325 
2326 	case LOAD_PHASE_LOAD_DEPOT:
2327 		vdo_set_dedupe_state_normal(vdo->hash_zones);
2328 		if (vdo_is_read_only(vdo)) {
2329 			/*
2330 			 * In read-only mode we don't use the allocator and it may not even be
2331 			 * readable, so don't bother trying to load it.
2332 			 */
2333 			vdo_set_completion_result(completion, VDO_READ_ONLY);
2334 			break;
2335 		}
2336 
2337 		if (requires_repair(vdo)) {
2338 			vdo_repair(completion);
2339 			return;
2340 		}
2341 
2342 		vdo_load_slab_depot(vdo->depot,
2343 				    (was_new(vdo) ? VDO_ADMIN_STATE_FORMATTING :
2344 				     VDO_ADMIN_STATE_LOADING),
2345 				    completion, NULL);
2346 		return;
2347 
2348 	case LOAD_PHASE_MAKE_DIRTY:
2349 		vdo_set_state(vdo, VDO_DIRTY);
2350 		vdo_save_components(vdo, completion);
2351 		return;
2352 
2353 	case LOAD_PHASE_PREPARE_TO_ALLOCATE:
2354 		vdo_initialize_block_map_from_journal(vdo->block_map,
2355 						      vdo->recovery_journal);
2356 		vdo_prepare_slab_depot_to_allocate(vdo->depot, get_load_type(vdo),
2357 						   completion);
2358 		return;
2359 
2360 	case LOAD_PHASE_SCRUB_SLABS:
2361 		if (vdo_state_requires_recovery(vdo->load_state))
2362 			vdo_enter_recovery_mode(vdo);
2363 
2364 		vdo_scrub_all_unrecovered_slabs(vdo->depot, completion);
2365 		return;
2366 
2367 	case LOAD_PHASE_DATA_REDUCTION:
2368 		WRITE_ONCE(vdo->compressing, vdo->device_config->compression);
2369 		if (vdo->device_config->deduplication) {
2370 			/*
2371 			 * Don't try to load or rebuild the index first (and log scary error
2372 			 * messages) if this is known to be a newly-formatted volume.
2373 			 */
2374 			vdo_start_dedupe_index(vdo->hash_zones, was_new(vdo));
2375 		}
2376 
2377 		vdo->allocations_allowed = false;
2378 		fallthrough;
2379 
2380 	case LOAD_PHASE_FINISHED:
2381 		break;
2382 
2383 	case LOAD_PHASE_DRAIN_JOURNAL:
2384 		vdo_drain_recovery_journal(vdo->recovery_journal, VDO_ADMIN_STATE_SAVING,
2385 					   completion);
2386 		return;
2387 
2388 	case LOAD_PHASE_WAIT_FOR_READ_ONLY:
2389 		/* Avoid an infinite loop */
2390 		completion->error_handler = NULL;
2391 		vdo->admin.phase = LOAD_PHASE_FINISHED;
2392 		vdo_wait_until_not_entering_read_only_mode(completion);
2393 		return;
2394 
2395 	default:
2396 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2397 	}
2398 
2399 	finish_operation_callback(completion);
2400 }
2401 
2402 /**
2403  * handle_load_error() - Handle an error during the load operation.
2404  * @completion: The admin completion.
2405  *
2406  * If at all possible, brings the vdo online in read-only mode. This handler is registered in
2407  * vdo_preresume_registered().
2408  */
2409 static void handle_load_error(struct vdo_completion *completion)
2410 {
2411 	struct vdo *vdo = completion->vdo;
2412 
2413 	if (vdo_requeue_completion_if_needed(completion,
2414 					     vdo->thread_config.admin_thread))
2415 		return;
2416 
2417 	if (vdo_state_requires_read_only_rebuild(vdo->load_state) &&
2418 	    (vdo->admin.phase == LOAD_PHASE_MAKE_DIRTY)) {
2419 		vdo_log_error_strerror(completion->result, "aborting load");
2420 		vdo->admin.phase = LOAD_PHASE_DRAIN_JOURNAL;
2421 		load_callback(vdo_forget(completion));
2422 		return;
2423 	}
2424 
2425 	if ((completion->result == VDO_UNSUPPORTED_VERSION) &&
2426 	    (vdo->admin.phase == LOAD_PHASE_MAKE_DIRTY)) {
2427 		vdo_log_error("Aborting load due to unsupported version");
2428 		vdo->admin.phase = LOAD_PHASE_FINISHED;
2429 		load_callback(completion);
2430 		return;
2431 	}
2432 
2433 	vdo_log_error_strerror(completion->result,
2434 			       "Entering read-only mode due to load error");
2435 	vdo->admin.phase = LOAD_PHASE_WAIT_FOR_READ_ONLY;
2436 	vdo_enter_read_only_mode(vdo, completion->result);
2437 	completion->result = VDO_READ_ONLY;
2438 	load_callback(completion);
2439 }
2440 
2441 /**
2442  * write_super_block_for_resume() - Update the VDO state and save the super block.
2443  * @completion: The admin completion
2444  */
2445 static void write_super_block_for_resume(struct vdo_completion *completion)
2446 {
2447 	struct vdo *vdo = completion->vdo;
2448 
2449 	switch (vdo_get_state(vdo)) {
2450 	case VDO_CLEAN:
2451 	case VDO_NEW:
2452 		vdo_set_state(vdo, VDO_DIRTY);
2453 		vdo_save_components(vdo, completion);
2454 		return;
2455 
2456 	case VDO_DIRTY:
2457 	case VDO_READ_ONLY_MODE:
2458 	case VDO_FORCE_REBUILD:
2459 	case VDO_RECOVERING:
2460 	case VDO_REBUILD_FOR_UPGRADE:
2461 		/* No need to write the super block in these cases */
2462 		vdo_launch_completion(completion);
2463 		return;
2464 
2465 	case VDO_REPLAYING:
2466 	default:
2467 		vdo_continue_completion(completion, UDS_BAD_STATE);
2468 	}
2469 }
2470 
2471 /**
2472  * resume_callback() - Callback to resume a VDO.
2473  * @completion: The admin completion.
2474  */
2475 static void resume_callback(struct vdo_completion *completion)
2476 {
2477 	struct vdo *vdo = completion->vdo;
2478 	int result;
2479 
2480 	assert_admin_phase_thread(vdo, __func__);
2481 
2482 	switch (advance_phase(vdo)) {
2483 	case RESUME_PHASE_START:
2484 		result = vdo_start_operation(&vdo->admin.state,
2485 					     VDO_ADMIN_STATE_RESUMING);
2486 		if (result != VDO_SUCCESS) {
2487 			vdo_continue_completion(completion, result);
2488 			return;
2489 		}
2490 
2491 		write_super_block_for_resume(completion);
2492 		return;
2493 
2494 	case RESUME_PHASE_ALLOW_READ_ONLY_MODE:
2495 		vdo_allow_read_only_mode_entry(completion);
2496 		return;
2497 
2498 	case RESUME_PHASE_DEDUPE:
2499 		vdo_resume_hash_zones(vdo->hash_zones, completion);
2500 		return;
2501 
2502 	case RESUME_PHASE_DEPOT:
2503 		vdo_resume_slab_depot(vdo->depot, completion);
2504 		return;
2505 
2506 	case RESUME_PHASE_JOURNAL:
2507 		vdo_resume_recovery_journal(vdo->recovery_journal, completion);
2508 		return;
2509 
2510 	case RESUME_PHASE_BLOCK_MAP:
2511 		vdo_resume_block_map(vdo->block_map, completion);
2512 		return;
2513 
2514 	case RESUME_PHASE_LOGICAL_ZONES:
2515 		vdo_resume_logical_zones(vdo->logical_zones, completion);
2516 		return;
2517 
2518 	case RESUME_PHASE_PACKER:
2519 	{
2520 		bool was_enabled = vdo_get_compressing(vdo);
2521 		bool enable = vdo->device_config->compression;
2522 
2523 		if (enable != was_enabled)
2524 			WRITE_ONCE(vdo->compressing, enable);
2525 		vdo_log_info("compression is %s", (enable ? "enabled" : "disabled"));
2526 
2527 		vdo_resume_packer(vdo->packer, completion);
2528 		return;
2529 	}
2530 
2531 	case RESUME_PHASE_FLUSHER:
2532 		vdo_resume_flusher(vdo->flusher, completion);
2533 		return;
2534 
2535 	case RESUME_PHASE_DATA_VIOS:
2536 		resume_data_vio_pool(vdo->data_vio_pool, completion);
2537 		return;
2538 
2539 	case RESUME_PHASE_END:
2540 		break;
2541 
2542 	default:
2543 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2544 	}
2545 
2546 	finish_operation_callback(completion);
2547 }
2548 
2549 /**
2550  * grow_logical_callback() - Callback to initiate a grow logical.
2551  * @completion: The admin completion.
2552  *
2553  * Registered in perform_grow_logical().
2554  */
2555 static void grow_logical_callback(struct vdo_completion *completion)
2556 {
2557 	struct vdo *vdo = completion->vdo;
2558 	int result;
2559 
2560 	assert_admin_phase_thread(vdo, __func__);
2561 
2562 	switch (advance_phase(vdo)) {
2563 	case GROW_LOGICAL_PHASE_START:
2564 		if (vdo_is_read_only(vdo)) {
2565 			vdo_log_error_strerror(VDO_READ_ONLY,
2566 					       "Can't grow logical size of a read-only VDO");
2567 			vdo_set_completion_result(completion, VDO_READ_ONLY);
2568 			break;
2569 		}
2570 
2571 		result = vdo_start_operation(&vdo->admin.state,
2572 					     VDO_ADMIN_STATE_SUSPENDED_OPERATION);
2573 		if (result != VDO_SUCCESS) {
2574 			vdo_continue_completion(completion, result);
2575 			return;
2576 		}
2577 
2578 		vdo->states.vdo.config.logical_blocks = vdo->block_map->next_entry_count;
2579 		vdo_save_components(vdo, completion);
2580 		return;
2581 
2582 	case GROW_LOGICAL_PHASE_GROW_BLOCK_MAP:
2583 		vdo_grow_block_map(vdo->block_map, completion);
2584 		return;
2585 
2586 	case GROW_LOGICAL_PHASE_END:
2587 		break;
2588 
2589 	case GROW_LOGICAL_PHASE_ERROR:
2590 		vdo_enter_read_only_mode(vdo, completion->result);
2591 		break;
2592 
2593 	default:
2594 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2595 	}
2596 
2597 	finish_operation_callback(completion);
2598 }
2599 
2600 /**
2601  * handle_logical_growth_error() - Handle an error during the grow physical process.
2602  * @completion: The admin completion.
2603  */
2604 static void handle_logical_growth_error(struct vdo_completion *completion)
2605 {
2606 	struct vdo *vdo = completion->vdo;
2607 
2608 	if (vdo->admin.phase == GROW_LOGICAL_PHASE_GROW_BLOCK_MAP) {
2609 		/*
2610 		 * We've failed to write the new size in the super block, so set our in memory
2611 		 * config back to the old size.
2612 		 */
2613 		vdo->states.vdo.config.logical_blocks = vdo->block_map->entry_count;
2614 		vdo_abandon_block_map_growth(vdo->block_map);
2615 	}
2616 
2617 	vdo->admin.phase = GROW_LOGICAL_PHASE_ERROR;
2618 	grow_logical_callback(completion);
2619 }
2620 
2621 /**
2622  * perform_grow_logical() - Grow the logical size of the vdo.
2623  * @vdo: The vdo to grow.
2624  * @new_logical_blocks: The size to which the vdo should be grown.
2625  *
2626  * Context: This method may only be called when the vdo has been suspended and must not be called
2627  * from a base thread.
2628  *
2629  * Return: VDO_SUCCESS or an error.
2630  */
2631 static int perform_grow_logical(struct vdo *vdo, block_count_t new_logical_blocks)
2632 {
2633 	int result;
2634 
2635 	if (vdo->device_config->logical_blocks == new_logical_blocks) {
2636 		/*
2637 		 * A table was loaded for which we prepared to grow, but a table without that
2638 		 * growth was what we are resuming with.
2639 		 */
2640 		vdo_abandon_block_map_growth(vdo->block_map);
2641 		return VDO_SUCCESS;
2642 	}
2643 
2644 	vdo_log_info("Resizing logical to %llu",
2645 		     (unsigned long long) new_logical_blocks);
2646 	if (vdo->block_map->next_entry_count != new_logical_blocks)
2647 		return VDO_PARAMETER_MISMATCH;
2648 
2649 	result = perform_admin_operation(vdo, GROW_LOGICAL_PHASE_START,
2650 					 grow_logical_callback,
2651 					 handle_logical_growth_error, "grow logical");
2652 	if (result != VDO_SUCCESS)
2653 		return result;
2654 
2655 	vdo_log_info("Logical blocks now %llu", (unsigned long long) new_logical_blocks);
2656 	return VDO_SUCCESS;
2657 }
2658 
2659 static void copy_callback(int read_err, unsigned long write_err, void *context)
2660 {
2661 	struct vdo_completion *completion = context;
2662 	int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
2663 
2664 	vdo_continue_completion(completion, result);
2665 }
2666 
2667 static void partition_to_region(struct partition *partition, struct vdo *vdo,
2668 				struct dm_io_region *region)
2669 {
2670 	physical_block_number_t pbn = partition->offset - vdo->geometry.bio_offset;
2671 
2672 	*region = (struct dm_io_region) {
2673 		.bdev = vdo_get_backing_device(vdo),
2674 		.sector = pbn * VDO_SECTORS_PER_BLOCK,
2675 		.count = partition->count * VDO_SECTORS_PER_BLOCK,
2676 	};
2677 }
2678 
2679 /**
2680  * copy_partition() - Copy a partition from the location specified in the current layout to that in
2681  *                    the next layout.
2682  * @vdo: The vdo preparing to grow.
2683  * @id: The ID of the partition to copy.
2684  * @parent: The completion to notify when the copy is complete.
2685  */
2686 static void copy_partition(struct vdo *vdo, enum partition_id id,
2687 			   struct vdo_completion *parent)
2688 {
2689 	struct dm_io_region read_region, write_regions[1];
2690 	struct partition *from = vdo_get_known_partition(&vdo->layout, id);
2691 	struct partition *to = vdo_get_known_partition(&vdo->next_layout, id);
2692 
2693 	partition_to_region(from, vdo, &read_region);
2694 	partition_to_region(to, vdo, &write_regions[0]);
2695 	dm_kcopyd_copy(vdo->partition_copier, &read_region, 1, write_regions, 0,
2696 		       copy_callback, parent);
2697 }
2698 
2699 /**
2700  * grow_physical_callback() - Callback to initiate a grow physical.
2701  * @completion: The admin completion.
2702  *
2703  * Registered in perform_grow_physical().
2704  */
2705 static void grow_physical_callback(struct vdo_completion *completion)
2706 {
2707 	struct vdo *vdo = completion->vdo;
2708 	int result;
2709 
2710 	assert_admin_phase_thread(vdo, __func__);
2711 
2712 	switch (advance_phase(vdo)) {
2713 	case GROW_PHYSICAL_PHASE_START:
2714 		if (vdo_is_read_only(vdo)) {
2715 			vdo_log_error_strerror(VDO_READ_ONLY,
2716 					       "Can't grow physical size of a read-only VDO");
2717 			vdo_set_completion_result(completion, VDO_READ_ONLY);
2718 			break;
2719 		}
2720 
2721 		result = vdo_start_operation(&vdo->admin.state,
2722 					     VDO_ADMIN_STATE_SUSPENDED_OPERATION);
2723 		if (result != VDO_SUCCESS) {
2724 			vdo_continue_completion(completion, result);
2725 			return;
2726 		}
2727 
2728 		/* Copy the journal into the new layout. */
2729 		copy_partition(vdo, VDO_RECOVERY_JOURNAL_PARTITION, completion);
2730 		return;
2731 
2732 	case GROW_PHYSICAL_PHASE_COPY_SUMMARY:
2733 		copy_partition(vdo, VDO_SLAB_SUMMARY_PARTITION, completion);
2734 		return;
2735 
2736 	case GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS:
2737 		vdo_uninitialize_layout(&vdo->layout);
2738 		vdo->layout = vdo->next_layout;
2739 		vdo_forget(vdo->next_layout.head);
2740 		vdo->states.vdo.config.physical_blocks = vdo->layout.size;
2741 		vdo_update_slab_depot_size(vdo->depot);
2742 		vdo_save_components(vdo, completion);
2743 		return;
2744 
2745 	case GROW_PHYSICAL_PHASE_USE_NEW_SLABS:
2746 		vdo_use_new_slabs(vdo->depot, completion);
2747 		return;
2748 
2749 	case GROW_PHYSICAL_PHASE_END:
2750 		vdo->depot->summary_origin =
2751 			vdo_get_known_partition(&vdo->layout,
2752 						VDO_SLAB_SUMMARY_PARTITION)->offset;
2753 		vdo->recovery_journal->origin =
2754 			vdo_get_known_partition(&vdo->layout,
2755 						VDO_RECOVERY_JOURNAL_PARTITION)->offset;
2756 		break;
2757 
2758 	case GROW_PHYSICAL_PHASE_ERROR:
2759 		vdo_enter_read_only_mode(vdo, completion->result);
2760 		break;
2761 
2762 	default:
2763 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2764 	}
2765 
2766 	vdo_uninitialize_layout(&vdo->next_layout);
2767 	finish_operation_callback(completion);
2768 }
2769 
2770 /**
2771  * handle_physical_growth_error() - Handle an error during the grow physical process.
2772  * @completion: The sub-task completion.
2773  */
2774 static void handle_physical_growth_error(struct vdo_completion *completion)
2775 {
2776 	completion->vdo->admin.phase = GROW_PHYSICAL_PHASE_ERROR;
2777 	grow_physical_callback(completion);
2778 }
2779 
2780 /**
2781  * perform_grow_physical() - Grow the physical size of the vdo.
2782  * @vdo: The vdo to resize.
2783  * @new_physical_blocks: The new physical size in blocks.
2784  *
2785  * Context: This method may only be called when the vdo has been suspended and must not be called
2786  * from a base thread.
2787  *
2788  * Return: VDO_SUCCESS or an error.
2789  */
2790 static int perform_grow_physical(struct vdo *vdo, block_count_t new_physical_blocks)
2791 {
2792 	int result;
2793 	block_count_t new_depot_size, prepared_depot_size;
2794 	block_count_t old_physical_blocks = vdo->states.vdo.config.physical_blocks;
2795 
2796 	/* Skip any noop grows. */
2797 	if (old_physical_blocks == new_physical_blocks)
2798 		return VDO_SUCCESS;
2799 
2800 	if (new_physical_blocks != vdo->next_layout.size) {
2801 		/*
2802 		 * Either the VDO isn't prepared to grow, or it was prepared to grow to a different
2803 		 * size. Doing this check here relies on the fact that the call to this method is
2804 		 * done under the dmsetup message lock.
2805 		 */
2806 		vdo_uninitialize_layout(&vdo->next_layout);
2807 		vdo_abandon_new_slabs(vdo->depot);
2808 		return VDO_PARAMETER_MISMATCH;
2809 	}
2810 
2811 	/* Validate that we are prepared to grow appropriately. */
2812 	new_depot_size =
2813 		vdo_get_known_partition(&vdo->next_layout, VDO_SLAB_DEPOT_PARTITION)->count;
2814 	prepared_depot_size = (vdo->depot->new_slabs == NULL) ? 0 : vdo->depot->new_size;
2815 	if (prepared_depot_size != new_depot_size)
2816 		return VDO_PARAMETER_MISMATCH;
2817 
2818 	result = perform_admin_operation(vdo, GROW_PHYSICAL_PHASE_START,
2819 					 grow_physical_callback,
2820 					 handle_physical_growth_error, "grow physical");
2821 	if (result != VDO_SUCCESS)
2822 		return result;
2823 
2824 	vdo_log_info("Physical block count was %llu, now %llu",
2825 		     (unsigned long long) old_physical_blocks,
2826 		     (unsigned long long) new_physical_blocks);
2827 	return VDO_SUCCESS;
2828 }
2829 
2830 /**
2831  * apply_new_vdo_configuration() - Attempt to make any configuration changes from the table being
2832  *                                 resumed.
2833  * @vdo: The vdo being resumed.
2834  * @config: The new device configuration derived from the table with which the vdo is being
2835  *          resumed.
2836  *
2837  * Return: VDO_SUCCESS or an error.
2838  */
2839 static int __must_check apply_new_vdo_configuration(struct vdo *vdo,
2840 						    struct device_config *config)
2841 {
2842 	int result;
2843 
2844 	result = perform_grow_logical(vdo, config->logical_blocks);
2845 	if (result != VDO_SUCCESS) {
2846 		vdo_log_error("grow logical operation failed, result = %d", result);
2847 		return result;
2848 	}
2849 
2850 	result = perform_grow_physical(vdo, config->physical_blocks);
2851 	if (result != VDO_SUCCESS)
2852 		vdo_log_error("resize operation failed, result = %d", result);
2853 
2854 	return result;
2855 }
2856 
2857 static int vdo_preresume_registered(struct dm_target *ti, struct vdo *vdo)
2858 {
2859 	struct device_config *config = ti->private;
2860 	const char *device_name = vdo_get_device_name(ti);
2861 	block_count_t backing_blocks;
2862 	int result;
2863 
2864 	backing_blocks = get_underlying_device_block_count(vdo);
2865 	if (backing_blocks < config->physical_blocks) {
2866 		/* FIXME: can this still happen? */
2867 		vdo_log_error("resume of device '%s' failed: backing device has %llu blocks but VDO physical size is %llu blocks",
2868 			      device_name, (unsigned long long) backing_blocks,
2869 			      (unsigned long long) config->physical_blocks);
2870 		return -EINVAL;
2871 	}
2872 
2873 	if (vdo_get_admin_state(vdo) == VDO_ADMIN_STATE_PRE_LOADED) {
2874 		vdo_log_info("starting device '%s'", device_name);
2875 		result = perform_admin_operation(vdo, LOAD_PHASE_START, load_callback,
2876 						 handle_load_error, "load");
2877 		if (result == VDO_UNSUPPORTED_VERSION) {
2878 			 /*
2879 			  * A component version is not supported. This can happen when the
2880 			  * recovery journal metadata is in an old version format. Abort the
2881 			  * load without saving the state.
2882 			  */
2883 			vdo->suspend_type = VDO_ADMIN_STATE_SUSPENDING;
2884 			perform_admin_operation(vdo, SUSPEND_PHASE_START,
2885 						suspend_callback, suspend_callback,
2886 						"suspend");
2887 			return result;
2888 		}
2889 
2890 		if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
2891 			/*
2892 			 * Something has gone very wrong. Make sure everything has drained and
2893 			 * leave the device in an unresumable state.
2894 			 */
2895 			vdo_log_error_strerror(result,
2896 					       "Start failed, could not load VDO metadata");
2897 			vdo->suspend_type = VDO_ADMIN_STATE_STOPPING;
2898 			perform_admin_operation(vdo, SUSPEND_PHASE_START,
2899 						suspend_callback, suspend_callback,
2900 						"suspend");
2901 			return result;
2902 		}
2903 
2904 		/* Even if the VDO is read-only, it is now able to handle read requests. */
2905 		vdo_log_info("device '%s' started", device_name);
2906 	}
2907 
2908 	vdo_log_info("resuming device '%s'", device_name);
2909 
2910 	/* If this fails, the VDO was not in a state to be resumed. This should never happen. */
2911 	result = apply_new_vdo_configuration(vdo, config);
2912 	BUG_ON(result == VDO_INVALID_ADMIN_STATE);
2913 
2914 	/*
2915 	 * Now that we've tried to modify the vdo, the new config *is* the config, whether the
2916 	 * modifications worked or not.
2917 	 */
2918 	vdo->device_config = config;
2919 
2920 	/*
2921 	 * Any error here is highly unexpected and the state of the vdo is questionable, so we mark
2922 	 * it read-only in memory. Because we are suspended, the read-only state will not be
2923 	 * written to disk.
2924 	 */
2925 	if (result != VDO_SUCCESS) {
2926 		vdo_log_error_strerror(result,
2927 				       "Commit of modifications to device '%s' failed",
2928 				       device_name);
2929 		vdo_enter_read_only_mode(vdo, result);
2930 		return result;
2931 	}
2932 
2933 	if (vdo_get_admin_state(vdo)->normal) {
2934 		/* The VDO was just started, so we don't need to resume it. */
2935 		return VDO_SUCCESS;
2936 	}
2937 
2938 	result = perform_admin_operation(vdo, RESUME_PHASE_START, resume_callback,
2939 					 resume_callback, "resume");
2940 	BUG_ON(result == VDO_INVALID_ADMIN_STATE);
2941 	if (result == VDO_READ_ONLY) {
2942 		/* Even if the vdo is read-only, it has still resumed. */
2943 		result = VDO_SUCCESS;
2944 	}
2945 
2946 	if (result != VDO_SUCCESS)
2947 		vdo_log_error("resume of device '%s' failed with error: %d", device_name,
2948 			      result);
2949 
2950 	return result;
2951 }
2952 
2953 static int vdo_preresume(struct dm_target *ti)
2954 {
2955 	struct registered_thread instance_thread;
2956 	struct vdo *vdo = get_vdo_for_target(ti);
2957 	int result;
2958 
2959 	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
2960 	result = vdo_preresume_registered(ti, vdo);
2961 	if ((result == VDO_PARAMETER_MISMATCH) || (result == VDO_INVALID_ADMIN_STATE) ||
2962 	    (result == VDO_UNSUPPORTED_VERSION))
2963 		result = -EINVAL;
2964 	vdo_unregister_thread_device_id();
2965 	return vdo_status_to_errno(result);
2966 }
2967 
2968 static void vdo_resume(struct dm_target *ti)
2969 {
2970 	struct registered_thread instance_thread;
2971 
2972 	vdo_register_thread_device_id(&instance_thread,
2973 				      &get_vdo_for_target(ti)->instance);
2974 	vdo_log_info("device '%s' resumed", vdo_get_device_name(ti));
2975 	vdo_unregister_thread_device_id();
2976 }
2977 
2978 /*
2979  * If anything changes that affects how user tools will interact with vdo, update the version
2980  * number and make sure documentation about the change is complete so tools can properly update
2981  * their management code.
2982  */
2983 static struct target_type vdo_target_bio = {
2984 	.features = DM_TARGET_SINGLETON,
2985 	.name = "vdo",
2986 	.version = { 9, 2, 0 },
2987 	.module = THIS_MODULE,
2988 	.ctr = vdo_ctr,
2989 	.dtr = vdo_dtr,
2990 	.io_hints = vdo_io_hints,
2991 	.iterate_devices = vdo_iterate_devices,
2992 	.map = vdo_map_bio,
2993 	.message = vdo_message,
2994 	.status = vdo_status,
2995 	.presuspend = vdo_presuspend,
2996 	.postsuspend = vdo_postsuspend,
2997 	.preresume = vdo_preresume,
2998 	.resume = vdo_resume,
2999 };
3000 
3001 static bool dm_registered;
3002 
3003 static void vdo_module_destroy(void)
3004 {
3005 	vdo_log_debug("unloading");
3006 
3007 	if (dm_registered)
3008 		dm_unregister_target(&vdo_target_bio);
3009 
3010 	VDO_ASSERT_LOG_ONLY(instances.count == 0,
3011 			    "should have no instance numbers still in use, but have %u",
3012 			    instances.count);
3013 	vdo_free(instances.words);
3014 	memset(&instances, 0, sizeof(struct instance_tracker));
3015 }
3016 
3017 static int __init vdo_init(void)
3018 {
3019 	int result = 0;
3020 
3021 	/* Memory tracking must be initialized first for accurate accounting. */
3022 	vdo_memory_init();
3023 	vdo_initialize_threads_mutex();
3024 	vdo_initialize_thread_device_registry();
3025 	vdo_initialize_device_registry_once();
3026 
3027 	/* Add VDO errors to the set of errors registered by the indexer. */
3028 	result = vdo_register_status_codes();
3029 	if (result != VDO_SUCCESS) {
3030 		vdo_log_error("vdo_register_status_codes failed %d", result);
3031 		vdo_module_destroy();
3032 		return result;
3033 	}
3034 
3035 	result = dm_register_target(&vdo_target_bio);
3036 	if (result < 0) {
3037 		vdo_log_error("dm_register_target failed %d", result);
3038 		vdo_module_destroy();
3039 		return result;
3040 	}
3041 	dm_registered = true;
3042 
3043 	return result;
3044 }
3045 
3046 static void __exit vdo_exit(void)
3047 {
3048 	vdo_module_destroy();
3049 	/* Memory tracking cleanup must be done last. */
3050 	vdo_memory_exit();
3051 }
3052 
3053 module_init(vdo_init);
3054 module_exit(vdo_exit);
3055 
3056 module_param_named(log_level, vdo_log_level, uint, 0644);
3057 MODULE_PARM_DESC(log_level, "Log level for log messages");
3058 
3059 MODULE_DESCRIPTION(DM_NAME " target for transparent deduplication");
3060 MODULE_AUTHOR("Red Hat, Inc.");
3061 MODULE_LICENSE("GPL");
3062