xref: /linux/drivers/md/dm-vdo/dm-vdo-target.c (revision 001821b0e79716c4e17c71d8e053a23599a7a508)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include <linux/atomic.h>
7 #include <linux/bitops.h>
8 #include <linux/completion.h>
9 #include <linux/delay.h>
10 #include <linux/device-mapper.h>
11 #include <linux/err.h>
12 #include <linux/module.h>
13 #include <linux/mutex.h>
14 #include <linux/spinlock.h>
15 
16 #include "admin-state.h"
17 #include "block-map.h"
18 #include "completion.h"
19 #include "constants.h"
20 #include "data-vio.h"
21 #include "dedupe.h"
22 #include "dump.h"
23 #include "encodings.h"
24 #include "errors.h"
25 #include "flush.h"
26 #include "io-submitter.h"
27 #include "logger.h"
28 #include "memory-alloc.h"
29 #include "message-stats.h"
30 #include "recovery-journal.h"
31 #include "repair.h"
32 #include "slab-depot.h"
33 #include "status-codes.h"
34 #include "string-utils.h"
35 #include "thread-device.h"
36 #include "thread-registry.h"
37 #include "thread-utils.h"
38 #include "types.h"
39 #include "vdo.h"
40 #include "vio.h"
41 
42 enum admin_phases {
43 	GROW_LOGICAL_PHASE_START,
44 	GROW_LOGICAL_PHASE_GROW_BLOCK_MAP,
45 	GROW_LOGICAL_PHASE_END,
46 	GROW_LOGICAL_PHASE_ERROR,
47 	GROW_PHYSICAL_PHASE_START,
48 	GROW_PHYSICAL_PHASE_COPY_SUMMARY,
49 	GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS,
50 	GROW_PHYSICAL_PHASE_USE_NEW_SLABS,
51 	GROW_PHYSICAL_PHASE_END,
52 	GROW_PHYSICAL_PHASE_ERROR,
53 	LOAD_PHASE_START,
54 	LOAD_PHASE_LOAD_DEPOT,
55 	LOAD_PHASE_MAKE_DIRTY,
56 	LOAD_PHASE_PREPARE_TO_ALLOCATE,
57 	LOAD_PHASE_SCRUB_SLABS,
58 	LOAD_PHASE_DATA_REDUCTION,
59 	LOAD_PHASE_FINISHED,
60 	LOAD_PHASE_DRAIN_JOURNAL,
61 	LOAD_PHASE_WAIT_FOR_READ_ONLY,
62 	PRE_LOAD_PHASE_START,
63 	PRE_LOAD_PHASE_LOAD_COMPONENTS,
64 	PRE_LOAD_PHASE_END,
65 	PREPARE_GROW_PHYSICAL_PHASE_START,
66 	RESUME_PHASE_START,
67 	RESUME_PHASE_ALLOW_READ_ONLY_MODE,
68 	RESUME_PHASE_DEDUPE,
69 	RESUME_PHASE_DEPOT,
70 	RESUME_PHASE_JOURNAL,
71 	RESUME_PHASE_BLOCK_MAP,
72 	RESUME_PHASE_LOGICAL_ZONES,
73 	RESUME_PHASE_PACKER,
74 	RESUME_PHASE_FLUSHER,
75 	RESUME_PHASE_DATA_VIOS,
76 	RESUME_PHASE_END,
77 	SUSPEND_PHASE_START,
78 	SUSPEND_PHASE_PACKER,
79 	SUSPEND_PHASE_DATA_VIOS,
80 	SUSPEND_PHASE_DEDUPE,
81 	SUSPEND_PHASE_FLUSHES,
82 	SUSPEND_PHASE_LOGICAL_ZONES,
83 	SUSPEND_PHASE_BLOCK_MAP,
84 	SUSPEND_PHASE_JOURNAL,
85 	SUSPEND_PHASE_DEPOT,
86 	SUSPEND_PHASE_READ_ONLY_WAIT,
87 	SUSPEND_PHASE_WRITE_SUPER_BLOCK,
88 	SUSPEND_PHASE_END,
89 };
90 
91 static const char * const ADMIN_PHASE_NAMES[] = {
92 	"GROW_LOGICAL_PHASE_START",
93 	"GROW_LOGICAL_PHASE_GROW_BLOCK_MAP",
94 	"GROW_LOGICAL_PHASE_END",
95 	"GROW_LOGICAL_PHASE_ERROR",
96 	"GROW_PHYSICAL_PHASE_START",
97 	"GROW_PHYSICAL_PHASE_COPY_SUMMARY",
98 	"GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS",
99 	"GROW_PHYSICAL_PHASE_USE_NEW_SLABS",
100 	"GROW_PHYSICAL_PHASE_END",
101 	"GROW_PHYSICAL_PHASE_ERROR",
102 	"LOAD_PHASE_START",
103 	"LOAD_PHASE_LOAD_DEPOT",
104 	"LOAD_PHASE_MAKE_DIRTY",
105 	"LOAD_PHASE_PREPARE_TO_ALLOCATE",
106 	"LOAD_PHASE_SCRUB_SLABS",
107 	"LOAD_PHASE_DATA_REDUCTION",
108 	"LOAD_PHASE_FINISHED",
109 	"LOAD_PHASE_DRAIN_JOURNAL",
110 	"LOAD_PHASE_WAIT_FOR_READ_ONLY",
111 	"PRE_LOAD_PHASE_START",
112 	"PRE_LOAD_PHASE_LOAD_COMPONENTS",
113 	"PRE_LOAD_PHASE_END",
114 	"PREPARE_GROW_PHYSICAL_PHASE_START",
115 	"RESUME_PHASE_START",
116 	"RESUME_PHASE_ALLOW_READ_ONLY_MODE",
117 	"RESUME_PHASE_DEDUPE",
118 	"RESUME_PHASE_DEPOT",
119 	"RESUME_PHASE_JOURNAL",
120 	"RESUME_PHASE_BLOCK_MAP",
121 	"RESUME_PHASE_LOGICAL_ZONES",
122 	"RESUME_PHASE_PACKER",
123 	"RESUME_PHASE_FLUSHER",
124 	"RESUME_PHASE_DATA_VIOS",
125 	"RESUME_PHASE_END",
126 	"SUSPEND_PHASE_START",
127 	"SUSPEND_PHASE_PACKER",
128 	"SUSPEND_PHASE_DATA_VIOS",
129 	"SUSPEND_PHASE_DEDUPE",
130 	"SUSPEND_PHASE_FLUSHES",
131 	"SUSPEND_PHASE_LOGICAL_ZONES",
132 	"SUSPEND_PHASE_BLOCK_MAP",
133 	"SUSPEND_PHASE_JOURNAL",
134 	"SUSPEND_PHASE_DEPOT",
135 	"SUSPEND_PHASE_READ_ONLY_WAIT",
136 	"SUSPEND_PHASE_WRITE_SUPER_BLOCK",
137 	"SUSPEND_PHASE_END",
138 };
139 
140 /* If we bump this, update the arrays below */
141 #define TABLE_VERSION 4
142 
143 /* arrays for handling different table versions */
144 static const u8 REQUIRED_ARGC[] = { 10, 12, 9, 7, 6 };
145 /* pool name no longer used. only here for verification of older versions */
146 static const u8 POOL_NAME_ARG_INDEX[] = { 8, 10, 8 };
147 
148 /*
149  * Track in-use instance numbers using a flat bit array.
150  *
151  * O(n) run time isn't ideal, but if we have 1000 VDO devices in use simultaneously we still only
152  * need to scan 16 words, so it's not likely to be a big deal compared to other resource usage.
153  */
154 
155 /*
156  * This minimum size for the bit array creates a numbering space of 0-999, which allows
157  * successive starts of the same volume to have different instance numbers in any
158  * reasonably-sized test. Changing instances on restart allows vdoMonReport to detect that
159  * the ephemeral stats have reset to zero.
160  */
161 #define BIT_COUNT_MINIMUM 1000
162 /* Grow the bit array by this many bits when needed */
163 #define BIT_COUNT_INCREMENT 100
164 
165 struct instance_tracker {
166 	unsigned int bit_count;
167 	unsigned long *words;
168 	unsigned int count;
169 	unsigned int next;
170 };
171 
172 static DEFINE_MUTEX(instances_lock);
173 static struct instance_tracker instances;
174 
175 /**
176  * free_device_config() - Free a device config created by parse_device_config().
177  * @config: The config to free.
178  */
179 static void free_device_config(struct device_config *config)
180 {
181 	if (config == NULL)
182 		return;
183 
184 	if (config->owned_device != NULL)
185 		dm_put_device(config->owning_target, config->owned_device);
186 
187 	vdo_free(config->parent_device_name);
188 	vdo_free(config->original_string);
189 
190 	/* Reduce the chance a use-after-free (as in BZ 1669960) happens to work. */
191 	memset(config, 0, sizeof(*config));
192 	vdo_free(config);
193 }
194 
195 /**
196  * get_version_number() - Decide the version number from argv.
197  *
198  * @argc: The number of table values.
199  * @argv: The array of table values.
200  * @error_ptr: A pointer to return a error string in.
201  * @version_ptr: A pointer to return the version.
202  *
203  * Return: VDO_SUCCESS or an error code.
204  */
205 static int get_version_number(int argc, char **argv, char **error_ptr,
206 			      unsigned int *version_ptr)
207 {
208 	/* version, if it exists, is in a form of V<n> */
209 	if (sscanf(argv[0], "V%u", version_ptr) == 1) {
210 		if (*version_ptr < 1 || *version_ptr > TABLE_VERSION) {
211 			*error_ptr = "Unknown version number detected";
212 			return VDO_BAD_CONFIGURATION;
213 		}
214 	} else {
215 		/* V0 actually has no version number in the table string */
216 		*version_ptr = 0;
217 	}
218 
219 	/*
220 	 * V0 and V1 have no optional parameters. There will always be a parameter for thread
221 	 * config, even if it's a "." to show it's an empty list.
222 	 */
223 	if (*version_ptr <= 1) {
224 		if (argc != REQUIRED_ARGC[*version_ptr]) {
225 			*error_ptr = "Incorrect number of arguments for version";
226 			return VDO_BAD_CONFIGURATION;
227 		}
228 	} else if (argc < REQUIRED_ARGC[*version_ptr]) {
229 		*error_ptr = "Incorrect number of arguments for version";
230 		return VDO_BAD_CONFIGURATION;
231 	}
232 
233 	if (*version_ptr != TABLE_VERSION) {
234 		vdo_log_warning("Detected version mismatch between kernel module and tools kernel: %d, tool: %d",
235 				TABLE_VERSION, *version_ptr);
236 		vdo_log_warning("Please consider upgrading management tools to match kernel.");
237 	}
238 	return VDO_SUCCESS;
239 }
240 
241 /* Free a list of non-NULL string pointers, and then the list itself. */
242 static void free_string_array(char **string_array)
243 {
244 	unsigned int offset;
245 
246 	for (offset = 0; string_array[offset] != NULL; offset++)
247 		vdo_free(string_array[offset]);
248 	vdo_free(string_array);
249 }
250 
251 /*
252  * Split the input string into substrings, separated at occurrences of the indicated character,
253  * returning a null-terminated list of string pointers.
254  *
255  * The string pointers and the pointer array itself should both be freed with vdo_free() when no
256  * longer needed. This can be done with vdo_free_string_array (below) if the pointers in the array
257  * are not changed. Since the array and copied strings are allocated by this function, it may only
258  * be used in contexts where allocation is permitted.
259  *
260  * Empty substrings are not ignored; that is, returned substrings may be empty strings if the
261  * separator occurs twice in a row.
262  */
263 static int split_string(const char *string, char separator, char ***substring_array_ptr)
264 {
265 	unsigned int current_substring = 0, substring_count = 1;
266 	const char *s;
267 	char **substrings;
268 	int result;
269 	ptrdiff_t length;
270 
271 	for (s = string; *s != 0; s++) {
272 		if (*s == separator)
273 			substring_count++;
274 	}
275 
276 	result = vdo_allocate(substring_count + 1, char *, "string-splitting array",
277 			      &substrings);
278 	if (result != VDO_SUCCESS)
279 		return result;
280 
281 	for (s = string; *s != 0; s++) {
282 		if (*s == separator) {
283 			ptrdiff_t length = s - string;
284 
285 			result = vdo_allocate(length + 1, char, "split string",
286 					      &substrings[current_substring]);
287 			if (result != VDO_SUCCESS) {
288 				free_string_array(substrings);
289 				return result;
290 			}
291 			/*
292 			 * Trailing NUL is already in place after allocation; deal with the zero or
293 			 * more non-NUL bytes in the string.
294 			 */
295 			if (length > 0)
296 				memcpy(substrings[current_substring], string, length);
297 			string = s + 1;
298 			current_substring++;
299 			BUG_ON(current_substring >= substring_count);
300 		}
301 	}
302 	/* Process final string, with no trailing separator. */
303 	BUG_ON(current_substring != (substring_count - 1));
304 	length = strlen(string);
305 
306 	result = vdo_allocate(length + 1, char, "split string",
307 			      &substrings[current_substring]);
308 	if (result != VDO_SUCCESS) {
309 		free_string_array(substrings);
310 		return result;
311 	}
312 	memcpy(substrings[current_substring], string, length);
313 	current_substring++;
314 	/* substrings[current_substring] is NULL already */
315 	*substring_array_ptr = substrings;
316 	return VDO_SUCCESS;
317 }
318 
319 /*
320  * Join the input substrings into one string, joined with the indicated character, returning a
321  * string. array_length is a bound on the number of valid elements in substring_array, in case it
322  * is not NULL-terminated.
323  */
324 static int join_strings(char **substring_array, size_t array_length, char separator,
325 			char **string_ptr)
326 {
327 	size_t string_length = 0;
328 	size_t i;
329 	int result;
330 	char *output, *current_position;
331 
332 	for (i = 0; (i < array_length) && (substring_array[i] != NULL); i++)
333 		string_length += strlen(substring_array[i]) + 1;
334 
335 	result = vdo_allocate(string_length, char, __func__, &output);
336 	if (result != VDO_SUCCESS)
337 		return result;
338 
339 	current_position = &output[0];
340 
341 	for (i = 0; (i < array_length) && (substring_array[i] != NULL); i++) {
342 		current_position = vdo_append_to_buffer(current_position,
343 							output + string_length, "%s",
344 							substring_array[i]);
345 		*current_position = separator;
346 		current_position++;
347 	}
348 
349 	/* We output one too many separators; replace the last with a zero byte. */
350 	if (current_position != output)
351 		*(current_position - 1) = '\0';
352 
353 	*string_ptr = output;
354 	return VDO_SUCCESS;
355 }
356 
357 /**
358  * parse_bool() - Parse a two-valued option into a bool.
359  * @bool_str: The string value to convert to a bool.
360  * @true_str: The string value which should be converted to true.
361  * @false_str: The string value which should be converted to false.
362  * @bool_ptr: A pointer to return the bool value in.
363  *
364  * Return: VDO_SUCCESS or an error if bool_str is neither true_str nor false_str.
365  */
366 static inline int __must_check parse_bool(const char *bool_str, const char *true_str,
367 					  const char *false_str, bool *bool_ptr)
368 {
369 	bool value = false;
370 
371 	if (strcmp(bool_str, true_str) == 0)
372 		value = true;
373 	else if (strcmp(bool_str, false_str) == 0)
374 		value = false;
375 	else
376 		return VDO_BAD_CONFIGURATION;
377 
378 	*bool_ptr = value;
379 	return VDO_SUCCESS;
380 }
381 
382 /**
383  * process_one_thread_config_spec() - Process one component of a thread parameter configuration
384  *				      string and update the configuration data structure.
385  * @thread_param_type: The type of thread specified.
386  * @count: The thread count requested.
387  * @config: The configuration data structure to update.
388  *
389  * If the thread count requested is invalid, a message is logged and -EINVAL returned. If the
390  * thread name is unknown, a message is logged but no error is returned.
391  *
392  * Return: VDO_SUCCESS or -EINVAL
393  */
394 static int process_one_thread_config_spec(const char *thread_param_type,
395 					  unsigned int count,
396 					  struct thread_count_config *config)
397 {
398 	/* Handle limited thread parameters */
399 	if (strcmp(thread_param_type, "bioRotationInterval") == 0) {
400 		if (count == 0) {
401 			vdo_log_error("thread config string error:  'bioRotationInterval' of at least 1 is required");
402 			return -EINVAL;
403 		} else if (count > VDO_BIO_ROTATION_INTERVAL_LIMIT) {
404 			vdo_log_error("thread config string error: 'bioRotationInterval' cannot be higher than %d",
405 				      VDO_BIO_ROTATION_INTERVAL_LIMIT);
406 			return -EINVAL;
407 		}
408 		config->bio_rotation_interval = count;
409 		return VDO_SUCCESS;
410 	}
411 	if (strcmp(thread_param_type, "logical") == 0) {
412 		if (count > MAX_VDO_LOGICAL_ZONES) {
413 			vdo_log_error("thread config string error: at most %d 'logical' threads are allowed",
414 				      MAX_VDO_LOGICAL_ZONES);
415 			return -EINVAL;
416 		}
417 		config->logical_zones = count;
418 		return VDO_SUCCESS;
419 	}
420 	if (strcmp(thread_param_type, "physical") == 0) {
421 		if (count > MAX_VDO_PHYSICAL_ZONES) {
422 			vdo_log_error("thread config string error: at most %d 'physical' threads are allowed",
423 				      MAX_VDO_PHYSICAL_ZONES);
424 			return -EINVAL;
425 		}
426 		config->physical_zones = count;
427 		return VDO_SUCCESS;
428 	}
429 	/* Handle other thread count parameters */
430 	if (count > MAXIMUM_VDO_THREADS) {
431 		vdo_log_error("thread config string error: at most %d '%s' threads are allowed",
432 			      MAXIMUM_VDO_THREADS, thread_param_type);
433 		return -EINVAL;
434 	}
435 	if (strcmp(thread_param_type, "hash") == 0) {
436 		config->hash_zones = count;
437 		return VDO_SUCCESS;
438 	}
439 	if (strcmp(thread_param_type, "cpu") == 0) {
440 		if (count == 0) {
441 			vdo_log_error("thread config string error: at least one 'cpu' thread required");
442 			return -EINVAL;
443 		}
444 		config->cpu_threads = count;
445 		return VDO_SUCCESS;
446 	}
447 	if (strcmp(thread_param_type, "ack") == 0) {
448 		config->bio_ack_threads = count;
449 		return VDO_SUCCESS;
450 	}
451 	if (strcmp(thread_param_type, "bio") == 0) {
452 		if (count == 0) {
453 			vdo_log_error("thread config string error: at least one 'bio' thread required");
454 			return -EINVAL;
455 		}
456 		config->bio_threads = count;
457 		return VDO_SUCCESS;
458 	}
459 
460 	/*
461 	 * Don't fail, just log. This will handle version mismatches between user mode tools and
462 	 * kernel.
463 	 */
464 	vdo_log_info("unknown thread parameter type \"%s\"", thread_param_type);
465 	return VDO_SUCCESS;
466 }
467 
468 /**
469  * parse_one_thread_config_spec() - Parse one component of a thread parameter configuration string
470  *				    and update the configuration data structure.
471  * @spec: The thread parameter specification string.
472  * @config: The configuration data to be updated.
473  */
474 static int parse_one_thread_config_spec(const char *spec,
475 					struct thread_count_config *config)
476 {
477 	unsigned int count;
478 	char **fields;
479 	int result;
480 
481 	result = split_string(spec, '=', &fields);
482 	if (result != VDO_SUCCESS)
483 		return result;
484 
485 	if ((fields[0] == NULL) || (fields[1] == NULL) || (fields[2] != NULL)) {
486 		vdo_log_error("thread config string error: expected thread parameter assignment, saw \"%s\"",
487 			      spec);
488 		free_string_array(fields);
489 		return -EINVAL;
490 	}
491 
492 	result = kstrtouint(fields[1], 10, &count);
493 	if (result) {
494 		vdo_log_error("thread config string error: integer value needed, found \"%s\"",
495 			      fields[1]);
496 		free_string_array(fields);
497 		return result;
498 	}
499 
500 	result = process_one_thread_config_spec(fields[0], count, config);
501 	free_string_array(fields);
502 	return result;
503 }
504 
505 /**
506  * parse_thread_config_string() - Parse the configuration string passed and update the specified
507  *				  counts and other parameters of various types of threads to be
508  *				  created.
509  * @string: Thread parameter configuration string.
510  * @config: The thread configuration data to update.
511  *
512  * The configuration string should contain one or more comma-separated specs of the form
513  * "typename=number"; the supported type names are "cpu", "ack", "bio", "bioRotationInterval",
514  * "logical", "physical", and "hash".
515  *
516  * If an error occurs during parsing of a single key/value pair, we deem it serious enough to stop
517  * further parsing.
518  *
519  * This function can't set the "reason" value the caller wants to pass back, because we'd want to
520  * format it to say which field was invalid, and we can't allocate the "reason" strings
521  * dynamically. So if an error occurs, we'll log the details and pass back an error.
522  *
523  * Return: VDO_SUCCESS or -EINVAL or -ENOMEM
524  */
525 static int parse_thread_config_string(const char *string,
526 				      struct thread_count_config *config)
527 {
528 	int result = VDO_SUCCESS;
529 	char **specs;
530 
531 	if (strcmp(".", string) != 0) {
532 		unsigned int i;
533 
534 		result = split_string(string, ',', &specs);
535 		if (result != VDO_SUCCESS)
536 			return result;
537 
538 		for (i = 0; specs[i] != NULL; i++) {
539 			result = parse_one_thread_config_spec(specs[i], config);
540 			if (result != VDO_SUCCESS)
541 				break;
542 		}
543 		free_string_array(specs);
544 	}
545 	return result;
546 }
547 
548 /**
549  * process_one_key_value_pair() - Process one component of an optional parameter string and update
550  *				  the configuration data structure.
551  * @key: The optional parameter key name.
552  * @value: The optional parameter value.
553  * @config: The configuration data structure to update.
554  *
555  * If the value requested is invalid, a message is logged and -EINVAL returned. If the key is
556  * unknown, a message is logged but no error is returned.
557  *
558  * Return: VDO_SUCCESS or -EINVAL
559  */
560 static int process_one_key_value_pair(const char *key, unsigned int value,
561 				      struct device_config *config)
562 {
563 	/* Non thread optional parameters */
564 	if (strcmp(key, "maxDiscard") == 0) {
565 		if (value == 0) {
566 			vdo_log_error("optional parameter error: at least one max discard block required");
567 			return -EINVAL;
568 		}
569 		/* Max discard sectors in blkdev_issue_discard is UINT_MAX >> 9 */
570 		if (value > (UINT_MAX / VDO_BLOCK_SIZE)) {
571 			vdo_log_error("optional parameter error: at most %d max discard	 blocks are allowed",
572 				      UINT_MAX / VDO_BLOCK_SIZE);
573 			return -EINVAL;
574 		}
575 		config->max_discard_blocks = value;
576 		return VDO_SUCCESS;
577 	}
578 	/* Handles unknown key names */
579 	return process_one_thread_config_spec(key, value, &config->thread_counts);
580 }
581 
582 /**
583  * parse_one_key_value_pair() - Parse one key/value pair and update the configuration data
584  *				structure.
585  * @key: The optional key name.
586  * @value: The optional value.
587  * @config: The configuration data to be updated.
588  *
589  * Return: VDO_SUCCESS or error.
590  */
591 static int parse_one_key_value_pair(const char *key, const char *value,
592 				    struct device_config *config)
593 {
594 	unsigned int count;
595 	int result;
596 
597 	if (strcmp(key, "deduplication") == 0)
598 		return parse_bool(value, "on", "off", &config->deduplication);
599 
600 	if (strcmp(key, "compression") == 0)
601 		return parse_bool(value, "on", "off", &config->compression);
602 
603 	/* The remaining arguments must have integral values. */
604 	result = kstrtouint(value, 10, &count);
605 	if (result) {
606 		vdo_log_error("optional config string error: integer value needed, found \"%s\"",
607 			      value);
608 		return result;
609 	}
610 	return process_one_key_value_pair(key, count, config);
611 }
612 
613 /**
614  * parse_key_value_pairs() - Parse all key/value pairs from a list of arguments.
615  * @argc: The total number of arguments in list.
616  * @argv: The list of key/value pairs.
617  * @config: The device configuration data to update.
618  *
619  * If an error occurs during parsing of a single key/value pair, we deem it serious enough to stop
620  * further parsing.
621  *
622  * This function can't set the "reason" value the caller wants to pass back, because we'd want to
623  * format it to say which field was invalid, and we can't allocate the "reason" strings
624  * dynamically. So if an error occurs, we'll log the details and return the error.
625  *
626  * Return: VDO_SUCCESS or error
627  */
628 static int parse_key_value_pairs(int argc, char **argv, struct device_config *config)
629 {
630 	int result = VDO_SUCCESS;
631 
632 	while (argc) {
633 		result = parse_one_key_value_pair(argv[0], argv[1], config);
634 		if (result != VDO_SUCCESS)
635 			break;
636 
637 		argc -= 2;
638 		argv += 2;
639 	}
640 
641 	return result;
642 }
643 
644 /**
645  * parse_optional_arguments() - Parse the configuration string passed in for optional arguments.
646  * @arg_set: The structure holding the arguments to parse.
647  * @error_ptr: Pointer to a buffer to hold the error string.
648  * @config: Pointer to device configuration data to update.
649  *
650  * For V0/V1 configurations, there will only be one optional parameter; the thread configuration.
651  * The configuration string should contain one or more comma-separated specs of the form
652  * "typename=number"; the supported type names are "cpu", "ack", "bio", "bioRotationInterval",
653  * "logical", "physical", and "hash".
654  *
655  * For V2 configurations and beyond, there could be any number of arguments. They should contain
656  * one or more key/value pairs separated by a space.
657  *
658  * Return: VDO_SUCCESS or error
659  */
660 static int parse_optional_arguments(struct dm_arg_set *arg_set, char **error_ptr,
661 				    struct device_config *config)
662 {
663 	int result = VDO_SUCCESS;
664 
665 	if (config->version == 0 || config->version == 1) {
666 		result = parse_thread_config_string(arg_set->argv[0],
667 						    &config->thread_counts);
668 		if (result != VDO_SUCCESS) {
669 			*error_ptr = "Invalid thread-count configuration";
670 			return VDO_BAD_CONFIGURATION;
671 		}
672 	} else {
673 		if ((arg_set->argc % 2) != 0) {
674 			*error_ptr = "Odd number of optional arguments given but they should be <key> <value> pairs";
675 			return VDO_BAD_CONFIGURATION;
676 		}
677 		result = parse_key_value_pairs(arg_set->argc, arg_set->argv, config);
678 		if (result != VDO_SUCCESS) {
679 			*error_ptr = "Invalid optional argument configuration";
680 			return VDO_BAD_CONFIGURATION;
681 		}
682 	}
683 	return result;
684 }
685 
686 /**
687  * handle_parse_error() - Handle a parsing error.
688  * @config: The config to free.
689  * @error_ptr: A place to store a constant string about the error.
690  * @error_str: A constant string to store in error_ptr.
691  */
692 static void handle_parse_error(struct device_config *config, char **error_ptr,
693 			       char *error_str)
694 {
695 	free_device_config(config);
696 	*error_ptr = error_str;
697 }
698 
699 /**
700  * parse_device_config() - Convert the dmsetup table into a struct device_config.
701  * @argc: The number of table values.
702  * @argv: The array of table values.
703  * @ti: The target structure for this table.
704  * @config_ptr: A pointer to return the allocated config.
705  *
706  * Return: VDO_SUCCESS or an error code.
707  */
708 static int parse_device_config(int argc, char **argv, struct dm_target *ti,
709 			       struct device_config **config_ptr)
710 {
711 	bool enable_512e;
712 	size_t logical_bytes = to_bytes(ti->len);
713 	struct dm_arg_set arg_set;
714 	char **error_ptr = &ti->error;
715 	struct device_config *config = NULL;
716 	int result;
717 
718 	if ((logical_bytes % VDO_BLOCK_SIZE) != 0) {
719 		handle_parse_error(config, error_ptr,
720 				   "Logical size must be a multiple of 4096");
721 		return VDO_BAD_CONFIGURATION;
722 	}
723 
724 	if (argc == 0) {
725 		handle_parse_error(config, error_ptr, "Incorrect number of arguments");
726 		return VDO_BAD_CONFIGURATION;
727 	}
728 
729 	result = vdo_allocate(1, struct device_config, "device_config", &config);
730 	if (result != VDO_SUCCESS) {
731 		handle_parse_error(config, error_ptr,
732 				   "Could not allocate config structure");
733 		return VDO_BAD_CONFIGURATION;
734 	}
735 
736 	config->owning_target = ti;
737 	config->logical_blocks = logical_bytes / VDO_BLOCK_SIZE;
738 	INIT_LIST_HEAD(&config->config_list);
739 
740 	/* Save the original string. */
741 	result = join_strings(argv, argc, ' ', &config->original_string);
742 	if (result != VDO_SUCCESS) {
743 		handle_parse_error(config, error_ptr, "Could not populate string");
744 		return VDO_BAD_CONFIGURATION;
745 	}
746 
747 	vdo_log_info("table line: %s", config->original_string);
748 
749 	config->thread_counts = (struct thread_count_config) {
750 		.bio_ack_threads = 1,
751 		.bio_threads = DEFAULT_VDO_BIO_SUBMIT_QUEUE_COUNT,
752 		.bio_rotation_interval = DEFAULT_VDO_BIO_SUBMIT_QUEUE_ROTATE_INTERVAL,
753 		.cpu_threads = 1,
754 		.logical_zones = 0,
755 		.physical_zones = 0,
756 		.hash_zones = 0,
757 	};
758 	config->max_discard_blocks = 1;
759 	config->deduplication = true;
760 	config->compression = false;
761 
762 	arg_set.argc = argc;
763 	arg_set.argv = argv;
764 
765 	result = get_version_number(argc, argv, error_ptr, &config->version);
766 	if (result != VDO_SUCCESS) {
767 		/* get_version_number sets error_ptr itself. */
768 		handle_parse_error(config, error_ptr, *error_ptr);
769 		return result;
770 	}
771 	/* Move the arg pointer forward only if the argument was there. */
772 	if (config->version >= 1)
773 		dm_shift_arg(&arg_set);
774 
775 	result = vdo_duplicate_string(dm_shift_arg(&arg_set), "parent device name",
776 				      &config->parent_device_name);
777 	if (result != VDO_SUCCESS) {
778 		handle_parse_error(config, error_ptr,
779 				   "Could not copy parent device name");
780 		return VDO_BAD_CONFIGURATION;
781 	}
782 
783 	/* Get the physical blocks, if known. */
784 	if (config->version >= 1) {
785 		result = kstrtoull(dm_shift_arg(&arg_set), 10, &config->physical_blocks);
786 		if (result != VDO_SUCCESS) {
787 			handle_parse_error(config, error_ptr,
788 					   "Invalid physical block count");
789 			return VDO_BAD_CONFIGURATION;
790 		}
791 	}
792 
793 	/* Get the logical block size and validate */
794 	result = parse_bool(dm_shift_arg(&arg_set), "512", "4096", &enable_512e);
795 	if (result != VDO_SUCCESS) {
796 		handle_parse_error(config, error_ptr, "Invalid logical block size");
797 		return VDO_BAD_CONFIGURATION;
798 	}
799 	config->logical_block_size = (enable_512e ? 512 : 4096);
800 
801 	/* Skip past the two no longer used read cache options. */
802 	if (config->version <= 1)
803 		dm_consume_args(&arg_set, 2);
804 
805 	/* Get the page cache size. */
806 	result = kstrtouint(dm_shift_arg(&arg_set), 10, &config->cache_size);
807 	if (result != VDO_SUCCESS) {
808 		handle_parse_error(config, error_ptr,
809 				   "Invalid block map page cache size");
810 		return VDO_BAD_CONFIGURATION;
811 	}
812 
813 	/* Get the block map era length. */
814 	result = kstrtouint(dm_shift_arg(&arg_set), 10, &config->block_map_maximum_age);
815 	if (result != VDO_SUCCESS) {
816 		handle_parse_error(config, error_ptr, "Invalid block map maximum age");
817 		return VDO_BAD_CONFIGURATION;
818 	}
819 
820 	/* Skip past the no longer used MD RAID5 optimization mode */
821 	if (config->version <= 2)
822 		dm_consume_args(&arg_set, 1);
823 
824 	/* Skip past the no longer used write policy setting */
825 	if (config->version <= 3)
826 		dm_consume_args(&arg_set, 1);
827 
828 	/* Skip past the no longer used pool name for older table lines */
829 	if (config->version <= 2) {
830 		/*
831 		 * Make sure the enum to get the pool name from argv directly is still in sync with
832 		 * the parsing of the table line.
833 		 */
834 		if (&arg_set.argv[0] != &argv[POOL_NAME_ARG_INDEX[config->version]]) {
835 			handle_parse_error(config, error_ptr,
836 					   "Pool name not in expected location");
837 			return VDO_BAD_CONFIGURATION;
838 		}
839 		dm_shift_arg(&arg_set);
840 	}
841 
842 	/* Get the optional arguments and validate. */
843 	result = parse_optional_arguments(&arg_set, error_ptr, config);
844 	if (result != VDO_SUCCESS) {
845 		/* parse_optional_arguments sets error_ptr itself. */
846 		handle_parse_error(config, error_ptr, *error_ptr);
847 		return result;
848 	}
849 
850 	/*
851 	 * Logical, physical, and hash zone counts can all be zero; then we get one thread doing
852 	 * everything, our older configuration. If any zone count is non-zero, the others must be
853 	 * as well.
854 	 */
855 	if (((config->thread_counts.logical_zones == 0) !=
856 	     (config->thread_counts.physical_zones == 0)) ||
857 	    ((config->thread_counts.physical_zones == 0) !=
858 	     (config->thread_counts.hash_zones == 0))) {
859 		handle_parse_error(config, error_ptr,
860 				   "Logical, physical, and hash zones counts must all be zero or all non-zero");
861 		return VDO_BAD_CONFIGURATION;
862 	}
863 
864 	if (config->cache_size <
865 	    (2 * MAXIMUM_VDO_USER_VIOS * config->thread_counts.logical_zones)) {
866 		handle_parse_error(config, error_ptr,
867 				   "Insufficient block map cache for logical zones");
868 		return VDO_BAD_CONFIGURATION;
869 	}
870 
871 	result = dm_get_device(ti, config->parent_device_name,
872 			       dm_table_get_mode(ti->table), &config->owned_device);
873 	if (result != 0) {
874 		vdo_log_error("couldn't open device \"%s\": error %d",
875 			      config->parent_device_name, result);
876 		handle_parse_error(config, error_ptr, "Unable to open storage device");
877 		return VDO_BAD_CONFIGURATION;
878 	}
879 
880 	if (config->version == 0) {
881 		u64 device_size = bdev_nr_bytes(config->owned_device->bdev);
882 
883 		config->physical_blocks = device_size / VDO_BLOCK_SIZE;
884 	}
885 
886 	*config_ptr = config;
887 	return result;
888 }
889 
890 static struct vdo *get_vdo_for_target(struct dm_target *ti)
891 {
892 	return ((struct device_config *) ti->private)->vdo;
893 }
894 
895 
896 static int vdo_map_bio(struct dm_target *ti, struct bio *bio)
897 {
898 	struct vdo *vdo = get_vdo_for_target(ti);
899 	struct vdo_work_queue *current_work_queue;
900 	const struct admin_state_code *code = vdo_get_admin_state_code(&vdo->admin.state);
901 
902 	VDO_ASSERT_LOG_ONLY(code->normal, "vdo should not receive bios while in state %s",
903 			    code->name);
904 
905 	/* Count all incoming bios. */
906 	vdo_count_bios(&vdo->stats.bios_in, bio);
907 
908 
909 	/* Handle empty bios.  Empty flush bios are not associated with a vio. */
910 	if ((bio_op(bio) == REQ_OP_FLUSH) || ((bio->bi_opf & REQ_PREFLUSH) != 0)) {
911 		vdo_launch_flush(vdo, bio);
912 		return DM_MAPIO_SUBMITTED;
913 	}
914 
915 	/* This could deadlock, */
916 	current_work_queue = vdo_get_current_work_queue();
917 	BUG_ON((current_work_queue != NULL) &&
918 	       (vdo == vdo_get_work_queue_owner(current_work_queue)->vdo));
919 	vdo_launch_bio(vdo->data_vio_pool, bio);
920 	return DM_MAPIO_SUBMITTED;
921 }
922 
923 static void vdo_io_hints(struct dm_target *ti, struct queue_limits *limits)
924 {
925 	struct vdo *vdo = get_vdo_for_target(ti);
926 
927 	limits->logical_block_size = vdo->device_config->logical_block_size;
928 	limits->physical_block_size = VDO_BLOCK_SIZE;
929 
930 	/* The minimum io size for random io */
931 	blk_limits_io_min(limits, VDO_BLOCK_SIZE);
932 	/* The optimal io size for streamed/sequential io */
933 	blk_limits_io_opt(limits, VDO_BLOCK_SIZE);
934 
935 	/*
936 	 * Sets the maximum discard size that will be passed into VDO. This value comes from a
937 	 * table line value passed in during dmsetup create.
938 	 *
939 	 * The value 1024 is the largest usable value on HD systems. A 2048 sector discard on a
940 	 * busy HD system takes 31 seconds. We should use a value no higher than 1024, which takes
941 	 * 15 to 16 seconds on a busy HD system. However, using large values results in 120 second
942 	 * blocked task warnings in kernel logs. In order to avoid these warnings, we choose to
943 	 * use the smallest reasonable value.
944 	 *
945 	 * The value is used by dm-thin to determine whether to pass down discards. The block layer
946 	 * splits large discards on this boundary when this is set.
947 	 */
948 	limits->max_discard_sectors =
949 		(vdo->device_config->max_discard_blocks * VDO_SECTORS_PER_BLOCK);
950 
951 	/*
952 	 * Force discards to not begin or end with a partial block by stating the granularity is
953 	 * 4k.
954 	 */
955 	limits->discard_granularity = VDO_BLOCK_SIZE;
956 }
957 
958 static int vdo_iterate_devices(struct dm_target *ti, iterate_devices_callout_fn fn,
959 			       void *data)
960 {
961 	struct device_config *config = get_vdo_for_target(ti)->device_config;
962 
963 	return fn(ti, config->owned_device, 0,
964 		  config->physical_blocks * VDO_SECTORS_PER_BLOCK, data);
965 }
966 
967 /*
968  * Status line is:
969  *    <device> <operating mode> <in recovery> <index state> <compression state>
970  *    <used physical blocks> <total physical blocks>
971  */
972 
973 static void vdo_status(struct dm_target *ti, status_type_t status_type,
974 		       unsigned int status_flags, char *result, unsigned int maxlen)
975 {
976 	struct vdo *vdo = get_vdo_for_target(ti);
977 	struct vdo_statistics *stats;
978 	struct device_config *device_config;
979 	/* N.B.: The DMEMIT macro uses the variables named "sz", "result", "maxlen". */
980 	int sz = 0;
981 
982 	switch (status_type) {
983 	case STATUSTYPE_INFO:
984 		/* Report info for dmsetup status */
985 		mutex_lock(&vdo->stats_mutex);
986 		vdo_fetch_statistics(vdo, &vdo->stats_buffer);
987 		stats = &vdo->stats_buffer;
988 
989 		DMEMIT("/dev/%pg %s %s %s %s %llu %llu",
990 		       vdo_get_backing_device(vdo), stats->mode,
991 		       stats->in_recovery_mode ? "recovering" : "-",
992 		       vdo_get_dedupe_index_state_name(vdo->hash_zones),
993 		       vdo_get_compressing(vdo) ? "online" : "offline",
994 		       stats->data_blocks_used + stats->overhead_blocks_used,
995 		       stats->physical_blocks);
996 		mutex_unlock(&vdo->stats_mutex);
997 		break;
998 
999 	case STATUSTYPE_TABLE:
1000 		/* Report the string actually specified in the beginning. */
1001 		device_config = (struct device_config *) ti->private;
1002 		DMEMIT("%s", device_config->original_string);
1003 		break;
1004 
1005 	case STATUSTYPE_IMA:
1006 		/* FIXME: We ought to be more detailed here, but this is what thin does. */
1007 		*result = '\0';
1008 		break;
1009 	}
1010 }
1011 
1012 static block_count_t __must_check get_underlying_device_block_count(const struct vdo *vdo)
1013 {
1014 	return bdev_nr_bytes(vdo_get_backing_device(vdo)) / VDO_BLOCK_SIZE;
1015 }
1016 
1017 static int __must_check process_vdo_message_locked(struct vdo *vdo, unsigned int argc,
1018 						   char **argv)
1019 {
1020 	if ((argc == 2) && (strcasecmp(argv[0], "compression") == 0)) {
1021 		if (strcasecmp(argv[1], "on") == 0) {
1022 			vdo_set_compressing(vdo, true);
1023 			return 0;
1024 		}
1025 
1026 		if (strcasecmp(argv[1], "off") == 0) {
1027 			vdo_set_compressing(vdo, false);
1028 			return 0;
1029 		}
1030 
1031 		vdo_log_warning("invalid argument '%s' to dmsetup compression message",
1032 				argv[1]);
1033 		return -EINVAL;
1034 	}
1035 
1036 	vdo_log_warning("unrecognized dmsetup message '%s' received", argv[0]);
1037 	return -EINVAL;
1038 }
1039 
1040 /*
1041  * If the message is a dump, just do it. Otherwise, check that no other message is being processed,
1042  * and only proceed if so.
1043  * Returns -EBUSY if another message is being processed
1044  */
1045 static int __must_check process_vdo_message(struct vdo *vdo, unsigned int argc,
1046 					    char **argv)
1047 {
1048 	int result;
1049 
1050 	/*
1051 	 * All messages which may be processed in parallel with other messages should be handled
1052 	 * here before the atomic check below. Messages which should be exclusive should be
1053 	 * processed in process_vdo_message_locked().
1054 	 */
1055 
1056 	/* Dump messages should always be processed */
1057 	if (strcasecmp(argv[0], "dump") == 0)
1058 		return vdo_dump(vdo, argc, argv, "dmsetup message");
1059 
1060 	if (argc == 1) {
1061 		if (strcasecmp(argv[0], "dump-on-shutdown") == 0) {
1062 			vdo->dump_on_shutdown = true;
1063 			return 0;
1064 		}
1065 
1066 		/* Index messages should always be processed */
1067 		if ((strcasecmp(argv[0], "index-close") == 0) ||
1068 		    (strcasecmp(argv[0], "index-create") == 0) ||
1069 		    (strcasecmp(argv[0], "index-disable") == 0) ||
1070 		    (strcasecmp(argv[0], "index-enable") == 0))
1071 			return vdo_message_dedupe_index(vdo->hash_zones, argv[0]);
1072 	}
1073 
1074 	if (atomic_cmpxchg(&vdo->processing_message, 0, 1) != 0)
1075 		return -EBUSY;
1076 
1077 	result = process_vdo_message_locked(vdo, argc, argv);
1078 
1079 	/* Pairs with the implicit barrier in cmpxchg just above */
1080 	smp_wmb();
1081 	atomic_set(&vdo->processing_message, 0);
1082 	return result;
1083 }
1084 
1085 static int vdo_message(struct dm_target *ti, unsigned int argc, char **argv,
1086 		       char *result_buffer, unsigned int maxlen)
1087 {
1088 	struct registered_thread allocating_thread, instance_thread;
1089 	struct vdo *vdo;
1090 	int result;
1091 
1092 	if (argc == 0) {
1093 		vdo_log_warning("unspecified dmsetup message");
1094 		return -EINVAL;
1095 	}
1096 
1097 	vdo = get_vdo_for_target(ti);
1098 	vdo_register_allocating_thread(&allocating_thread, NULL);
1099 	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
1100 
1101 	/*
1102 	 * Must be done here so we don't map return codes. The code in dm-ioctl expects a 1 for a
1103 	 * return code to look at the buffer and see if it is full or not.
1104 	 */
1105 	if ((argc == 1) && (strcasecmp(argv[0], "stats") == 0)) {
1106 		vdo_write_stats(vdo, result_buffer, maxlen);
1107 		result = 1;
1108 	} else {
1109 		result = vdo_status_to_errno(process_vdo_message(vdo, argc, argv));
1110 	}
1111 
1112 	vdo_unregister_thread_device_id();
1113 	vdo_unregister_allocating_thread();
1114 	return result;
1115 }
1116 
1117 static void configure_target_capabilities(struct dm_target *ti)
1118 {
1119 	ti->discards_supported = 1;
1120 	ti->flush_supported = true;
1121 	ti->num_discard_bios = 1;
1122 	ti->num_flush_bios = 1;
1123 
1124 	/*
1125 	 * If this value changes, please make sure to update the value for max_discard_sectors
1126 	 * accordingly.
1127 	 */
1128 	BUG_ON(dm_set_target_max_io_len(ti, VDO_SECTORS_PER_BLOCK) != 0);
1129 }
1130 
1131 /*
1132  * Implements vdo_filter_fn.
1133  */
1134 static bool vdo_uses_device(struct vdo *vdo, const void *context)
1135 {
1136 	const struct device_config *config = context;
1137 
1138 	return vdo_get_backing_device(vdo)->bd_dev == config->owned_device->bdev->bd_dev;
1139 }
1140 
1141 /**
1142  * get_thread_id_for_phase() - Get the thread id for the current phase of the admin operation in
1143  *                             progress.
1144  */
1145 static thread_id_t __must_check get_thread_id_for_phase(struct vdo *vdo)
1146 {
1147 	switch (vdo->admin.phase) {
1148 	case RESUME_PHASE_PACKER:
1149 	case RESUME_PHASE_FLUSHER:
1150 	case SUSPEND_PHASE_PACKER:
1151 	case SUSPEND_PHASE_FLUSHES:
1152 		return vdo->thread_config.packer_thread;
1153 
1154 	case RESUME_PHASE_DATA_VIOS:
1155 	case SUSPEND_PHASE_DATA_VIOS:
1156 		return vdo->thread_config.cpu_thread;
1157 
1158 	case LOAD_PHASE_DRAIN_JOURNAL:
1159 	case RESUME_PHASE_JOURNAL:
1160 	case SUSPEND_PHASE_JOURNAL:
1161 		return vdo->thread_config.journal_thread;
1162 
1163 	default:
1164 		return vdo->thread_config.admin_thread;
1165 	}
1166 }
1167 
1168 static struct vdo_completion *prepare_admin_completion(struct vdo *vdo,
1169 						       vdo_action_fn callback,
1170 						       vdo_action_fn error_handler)
1171 {
1172 	struct vdo_completion *completion = &vdo->admin.completion;
1173 
1174 	/*
1175 	 * We can't use vdo_prepare_completion_for_requeue() here because we don't want to reset
1176 	 * any error in the completion.
1177 	 */
1178 	completion->callback = callback;
1179 	completion->error_handler = error_handler;
1180 	completion->callback_thread_id = get_thread_id_for_phase(vdo);
1181 	completion->requeue = true;
1182 	return completion;
1183 }
1184 
1185 /**
1186  * advance_phase() - Increment the phase of the current admin operation and prepare the admin
1187  *                   completion to run on the thread for the next phase.
1188  * @vdo: The on which an admin operation is being performed
1189  *
1190  * Return: The current phase
1191  */
1192 static u32 advance_phase(struct vdo *vdo)
1193 {
1194 	u32 phase = vdo->admin.phase++;
1195 
1196 	vdo->admin.completion.callback_thread_id = get_thread_id_for_phase(vdo);
1197 	vdo->admin.completion.requeue = true;
1198 	return phase;
1199 }
1200 
1201 /*
1202  * Perform an administrative operation (load, suspend, grow logical, or grow physical). This method
1203  * should not be called from vdo threads.
1204  */
1205 static int perform_admin_operation(struct vdo *vdo, u32 starting_phase,
1206 				   vdo_action_fn callback, vdo_action_fn error_handler,
1207 				   const char *type)
1208 {
1209 	int result;
1210 	struct vdo_administrator *admin = &vdo->admin;
1211 
1212 	if (atomic_cmpxchg(&admin->busy, 0, 1) != 0) {
1213 		return vdo_log_error_strerror(VDO_COMPONENT_BUSY,
1214 					      "Can't start %s operation, another operation is already in progress",
1215 					      type);
1216 	}
1217 
1218 	admin->phase = starting_phase;
1219 	reinit_completion(&admin->callback_sync);
1220 	vdo_reset_completion(&admin->completion);
1221 	vdo_launch_completion(prepare_admin_completion(vdo, callback, error_handler));
1222 
1223 	/*
1224 	 * Using the "interruptible" interface means that Linux will not log a message when we wait
1225 	 * for more than 120 seconds.
1226 	 */
1227 	while (wait_for_completion_interruptible(&admin->callback_sync)) {
1228 		/* However, if we get a signal in a user-mode process, we could spin... */
1229 		fsleep(1000);
1230 	}
1231 
1232 	result = admin->completion.result;
1233 	/* pairs with implicit barrier in cmpxchg above */
1234 	smp_wmb();
1235 	atomic_set(&admin->busy, 0);
1236 	return result;
1237 }
1238 
1239 /* Assert that we are operating on the correct thread for the current phase. */
1240 static void assert_admin_phase_thread(struct vdo *vdo, const char *what)
1241 {
1242 	VDO_ASSERT_LOG_ONLY(vdo_get_callback_thread_id() == get_thread_id_for_phase(vdo),
1243 			    "%s on correct thread for %s", what,
1244 			    ADMIN_PHASE_NAMES[vdo->admin.phase]);
1245 }
1246 
1247 /**
1248  * finish_operation_callback() - Callback to finish an admin operation.
1249  * @completion: The admin_completion.
1250  */
1251 static void finish_operation_callback(struct vdo_completion *completion)
1252 {
1253 	struct vdo_administrator *admin = &completion->vdo->admin;
1254 
1255 	vdo_finish_operation(&admin->state, completion->result);
1256 	complete(&admin->callback_sync);
1257 }
1258 
1259 /**
1260  * decode_from_super_block() - Decode the VDO state from the super block and validate that it is
1261  *                             correct.
1262  * @vdo: The vdo being loaded.
1263  *
1264  * On error from this method, the component states must be destroyed explicitly. If this method
1265  * returns successfully, the component states must not be destroyed.
1266  *
1267  * Return: VDO_SUCCESS or an error.
1268  */
1269 static int __must_check decode_from_super_block(struct vdo *vdo)
1270 {
1271 	const struct device_config *config = vdo->device_config;
1272 	int result;
1273 
1274 	result = vdo_decode_component_states(vdo->super_block.buffer, &vdo->geometry,
1275 					     &vdo->states);
1276 	if (result != VDO_SUCCESS)
1277 		return result;
1278 
1279 	vdo_set_state(vdo, vdo->states.vdo.state);
1280 	vdo->load_state = vdo->states.vdo.state;
1281 
1282 	/*
1283 	 * If the device config specifies a larger logical size than was recorded in the super
1284 	 * block, just accept it.
1285 	 */
1286 	if (vdo->states.vdo.config.logical_blocks < config->logical_blocks) {
1287 		vdo_log_warning("Growing logical size: a logical size of %llu blocks was specified, but that differs from the %llu blocks configured in the vdo super block",
1288 				(unsigned long long) config->logical_blocks,
1289 				(unsigned long long) vdo->states.vdo.config.logical_blocks);
1290 		vdo->states.vdo.config.logical_blocks = config->logical_blocks;
1291 	}
1292 
1293 	result = vdo_validate_component_states(&vdo->states, vdo->geometry.nonce,
1294 					       config->physical_blocks,
1295 					       config->logical_blocks);
1296 	if (result != VDO_SUCCESS)
1297 		return result;
1298 
1299 	vdo->layout = vdo->states.layout;
1300 	return VDO_SUCCESS;
1301 }
1302 
1303 /**
1304  * decode_vdo() - Decode the component data portion of a super block and fill in the corresponding
1305  *                portions of the vdo being loaded.
1306  * @vdo: The vdo being loaded.
1307  *
1308  * This will also allocate the recovery journal and slab depot. If this method is called with an
1309  * asynchronous layer (i.e. a thread config which specifies at least one base thread), the block
1310  * map and packer will be constructed as well.
1311  *
1312  * Return: VDO_SUCCESS or an error.
1313  */
1314 static int __must_check decode_vdo(struct vdo *vdo)
1315 {
1316 	block_count_t maximum_age, journal_length;
1317 	struct partition *partition;
1318 	int result;
1319 
1320 	result = decode_from_super_block(vdo);
1321 	if (result != VDO_SUCCESS) {
1322 		vdo_destroy_component_states(&vdo->states);
1323 		return result;
1324 	}
1325 
1326 	maximum_age = vdo_convert_maximum_age(vdo->device_config->block_map_maximum_age);
1327 	journal_length =
1328 		vdo_get_recovery_journal_length(vdo->states.vdo.config.recovery_journal_size);
1329 	if (maximum_age > (journal_length / 2)) {
1330 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
1331 					      "maximum age: %llu exceeds limit %llu",
1332 					      (unsigned long long) maximum_age,
1333 					      (unsigned long long) (journal_length / 2));
1334 	}
1335 
1336 	if (maximum_age == 0) {
1337 		return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
1338 					      "maximum age must be greater than 0");
1339 	}
1340 
1341 	result = vdo_enable_read_only_entry(vdo);
1342 	if (result != VDO_SUCCESS)
1343 		return result;
1344 
1345 	partition = vdo_get_known_partition(&vdo->layout,
1346 					    VDO_RECOVERY_JOURNAL_PARTITION);
1347 	result = vdo_decode_recovery_journal(vdo->states.recovery_journal,
1348 					     vdo->states.vdo.nonce, vdo, partition,
1349 					     vdo->states.vdo.complete_recoveries,
1350 					     vdo->states.vdo.config.recovery_journal_size,
1351 					     &vdo->recovery_journal);
1352 	if (result != VDO_SUCCESS)
1353 		return result;
1354 
1355 	partition = vdo_get_known_partition(&vdo->layout, VDO_SLAB_SUMMARY_PARTITION);
1356 	result = vdo_decode_slab_depot(vdo->states.slab_depot, vdo, partition,
1357 				       &vdo->depot);
1358 	if (result != VDO_SUCCESS)
1359 		return result;
1360 
1361 	result = vdo_decode_block_map(vdo->states.block_map,
1362 				      vdo->states.vdo.config.logical_blocks, vdo,
1363 				      vdo->recovery_journal, vdo->states.vdo.nonce,
1364 				      vdo->device_config->cache_size, maximum_age,
1365 				      &vdo->block_map);
1366 	if (result != VDO_SUCCESS)
1367 		return result;
1368 
1369 	result = vdo_make_physical_zones(vdo, &vdo->physical_zones);
1370 	if (result != VDO_SUCCESS)
1371 		return result;
1372 
1373 	/* The logical zones depend on the physical zones already existing. */
1374 	result = vdo_make_logical_zones(vdo, &vdo->logical_zones);
1375 	if (result != VDO_SUCCESS)
1376 		return result;
1377 
1378 	return vdo_make_hash_zones(vdo, &vdo->hash_zones);
1379 }
1380 
1381 /**
1382  * pre_load_callback() - Callback to initiate a pre-load, registered in vdo_initialize().
1383  * @completion: The admin completion.
1384  */
1385 static void pre_load_callback(struct vdo_completion *completion)
1386 {
1387 	struct vdo *vdo = completion->vdo;
1388 	int result;
1389 
1390 	assert_admin_phase_thread(vdo, __func__);
1391 
1392 	switch (advance_phase(vdo)) {
1393 	case PRE_LOAD_PHASE_START:
1394 		result = vdo_start_operation(&vdo->admin.state,
1395 					     VDO_ADMIN_STATE_PRE_LOADING);
1396 		if (result != VDO_SUCCESS) {
1397 			vdo_continue_completion(completion, result);
1398 			return;
1399 		}
1400 
1401 		vdo_load_super_block(vdo, completion);
1402 		return;
1403 
1404 	case PRE_LOAD_PHASE_LOAD_COMPONENTS:
1405 		vdo_continue_completion(completion, decode_vdo(vdo));
1406 		return;
1407 
1408 	case PRE_LOAD_PHASE_END:
1409 		break;
1410 
1411 	default:
1412 		vdo_set_completion_result(completion, UDS_BAD_STATE);
1413 	}
1414 
1415 	finish_operation_callback(completion);
1416 }
1417 
1418 static void release_instance(unsigned int instance)
1419 {
1420 	mutex_lock(&instances_lock);
1421 	if (instance >= instances.bit_count) {
1422 		VDO_ASSERT_LOG_ONLY(false,
1423 				    "instance number %u must be less than bit count %u",
1424 				    instance, instances.bit_count);
1425 	} else if (test_bit(instance, instances.words) == 0) {
1426 		VDO_ASSERT_LOG_ONLY(false, "instance number %u must be allocated", instance);
1427 	} else {
1428 		__clear_bit(instance, instances.words);
1429 		instances.count -= 1;
1430 	}
1431 	mutex_unlock(&instances_lock);
1432 }
1433 
1434 static void set_device_config(struct dm_target *ti, struct vdo *vdo,
1435 			      struct device_config *config)
1436 {
1437 	list_del_init(&config->config_list);
1438 	list_add_tail(&config->config_list, &vdo->device_config_list);
1439 	config->vdo = vdo;
1440 	ti->private = config;
1441 	configure_target_capabilities(ti);
1442 }
1443 
1444 static int vdo_initialize(struct dm_target *ti, unsigned int instance,
1445 			  struct device_config *config)
1446 {
1447 	struct vdo *vdo;
1448 	int result;
1449 	u64 block_size = VDO_BLOCK_SIZE;
1450 	u64 logical_size = to_bytes(ti->len);
1451 	block_count_t logical_blocks = logical_size / block_size;
1452 
1453 	vdo_log_info("loading device '%s'", vdo_get_device_name(ti));
1454 	vdo_log_debug("Logical block size     = %llu", (u64) config->logical_block_size);
1455 	vdo_log_debug("Logical blocks         = %llu", logical_blocks);
1456 	vdo_log_debug("Physical block size    = %llu", (u64) block_size);
1457 	vdo_log_debug("Physical blocks        = %llu", config->physical_blocks);
1458 	vdo_log_debug("Block map cache blocks = %u", config->cache_size);
1459 	vdo_log_debug("Block map maximum age  = %u", config->block_map_maximum_age);
1460 	vdo_log_debug("Deduplication          = %s", (config->deduplication ? "on" : "off"));
1461 	vdo_log_debug("Compression            = %s", (config->compression ? "on" : "off"));
1462 
1463 	vdo = vdo_find_matching(vdo_uses_device, config);
1464 	if (vdo != NULL) {
1465 		vdo_log_error("Existing vdo already uses device %s",
1466 			      vdo->device_config->parent_device_name);
1467 		ti->error = "Cannot share storage device with already-running VDO";
1468 		return VDO_BAD_CONFIGURATION;
1469 	}
1470 
1471 	result = vdo_make(instance, config, &ti->error, &vdo);
1472 	if (result != VDO_SUCCESS) {
1473 		vdo_log_error("Could not create VDO device. (VDO error %d, message %s)",
1474 			      result, ti->error);
1475 		vdo_destroy(vdo);
1476 		return result;
1477 	}
1478 
1479 	result = perform_admin_operation(vdo, PRE_LOAD_PHASE_START, pre_load_callback,
1480 					 finish_operation_callback, "pre-load");
1481 	if (result != VDO_SUCCESS) {
1482 		ti->error = ((result == VDO_INVALID_ADMIN_STATE) ?
1483 			     "Pre-load is only valid immediately after initialization" :
1484 			     "Cannot load metadata from device");
1485 		vdo_log_error("Could not start VDO device. (VDO error %d, message %s)",
1486 			      result, ti->error);
1487 		vdo_destroy(vdo);
1488 		return result;
1489 	}
1490 
1491 	set_device_config(ti, vdo, config);
1492 	vdo->device_config = config;
1493 	return VDO_SUCCESS;
1494 }
1495 
1496 /* Implements vdo_filter_fn. */
1497 static bool __must_check vdo_is_named(struct vdo *vdo, const void *context)
1498 {
1499 	struct dm_target *ti = vdo->device_config->owning_target;
1500 	const char *device_name = vdo_get_device_name(ti);
1501 
1502 	return strcmp(device_name, context) == 0;
1503 }
1504 
1505 /**
1506  * get_bit_array_size() - Return the number of bytes needed to store a bit array of the specified
1507  *                        capacity in an array of unsigned longs.
1508  * @bit_count: The number of bits the array must hold.
1509  *
1510  * Return: the number of bytes needed for the array representation.
1511  */
1512 static size_t get_bit_array_size(unsigned int bit_count)
1513 {
1514 	/* Round up to a multiple of the word size and convert to a byte count. */
1515 	return (BITS_TO_LONGS(bit_count) * sizeof(unsigned long));
1516 }
1517 
1518 /**
1519  * grow_bit_array() - Re-allocate the bitmap word array so there will more instance numbers that
1520  *                    can be allocated.
1521  *
1522  * Since the array is initially NULL, this also initializes the array the first time we allocate an
1523  * instance number.
1524  *
1525  * Return: VDO_SUCCESS or an error code from the allocation
1526  */
1527 static int grow_bit_array(void)
1528 {
1529 	unsigned int new_count = max(instances.bit_count + BIT_COUNT_INCREMENT,
1530 				     (unsigned int) BIT_COUNT_MINIMUM);
1531 	unsigned long *new_words;
1532 	int result;
1533 
1534 	result = vdo_reallocate_memory(instances.words,
1535 				       get_bit_array_size(instances.bit_count),
1536 				       get_bit_array_size(new_count),
1537 				       "instance number bit array", &new_words);
1538 	if (result != VDO_SUCCESS)
1539 		return result;
1540 
1541 	instances.bit_count = new_count;
1542 	instances.words = new_words;
1543 	return VDO_SUCCESS;
1544 }
1545 
1546 /**
1547  * allocate_instance() - Allocate an instance number.
1548  * @instance_ptr: A point to hold the instance number
1549  *
1550  * Return: VDO_SUCCESS or an error code
1551  *
1552  * This function must be called while holding the instances lock.
1553  */
1554 static int allocate_instance(unsigned int *instance_ptr)
1555 {
1556 	unsigned int instance;
1557 	int result;
1558 
1559 	/* If there are no unallocated instances, grow the bit array. */
1560 	if (instances.count >= instances.bit_count) {
1561 		result = grow_bit_array();
1562 		if (result != VDO_SUCCESS)
1563 			return result;
1564 	}
1565 
1566 	/*
1567 	 * There must be a zero bit somewhere now. Find it, starting just after the last instance
1568 	 * allocated.
1569 	 */
1570 	instance = find_next_zero_bit(instances.words, instances.bit_count,
1571 				      instances.next);
1572 	if (instance >= instances.bit_count) {
1573 		/* Nothing free after next, so wrap around to instance zero. */
1574 		instance = find_first_zero_bit(instances.words, instances.bit_count);
1575 		result = VDO_ASSERT(instance < instances.bit_count,
1576 				    "impossibly, no zero bit found");
1577 		if (result != VDO_SUCCESS)
1578 			return result;
1579 	}
1580 
1581 	__set_bit(instance, instances.words);
1582 	instances.count++;
1583 	instances.next = instance + 1;
1584 	*instance_ptr = instance;
1585 	return VDO_SUCCESS;
1586 }
1587 
1588 static int construct_new_vdo_registered(struct dm_target *ti, unsigned int argc,
1589 					char **argv, unsigned int instance)
1590 {
1591 	int result;
1592 	struct device_config *config;
1593 
1594 	result = parse_device_config(argc, argv, ti, &config);
1595 	if (result != VDO_SUCCESS) {
1596 		vdo_log_error_strerror(result, "parsing failed: %s", ti->error);
1597 		release_instance(instance);
1598 		return -EINVAL;
1599 	}
1600 
1601 	/* Beyond this point, the instance number will be cleaned up for us if needed */
1602 	result = vdo_initialize(ti, instance, config);
1603 	if (result != VDO_SUCCESS) {
1604 		release_instance(instance);
1605 		free_device_config(config);
1606 		return vdo_status_to_errno(result);
1607 	}
1608 
1609 	return VDO_SUCCESS;
1610 }
1611 
1612 static int construct_new_vdo(struct dm_target *ti, unsigned int argc, char **argv)
1613 {
1614 	int result;
1615 	unsigned int instance;
1616 	struct registered_thread instance_thread;
1617 
1618 	mutex_lock(&instances_lock);
1619 	result = allocate_instance(&instance);
1620 	mutex_unlock(&instances_lock);
1621 	if (result != VDO_SUCCESS)
1622 		return -ENOMEM;
1623 
1624 	vdo_register_thread_device_id(&instance_thread, &instance);
1625 	result = construct_new_vdo_registered(ti, argc, argv, instance);
1626 	vdo_unregister_thread_device_id();
1627 	return result;
1628 }
1629 
1630 /**
1631  * check_may_grow_physical() - Callback to check that we're not in recovery mode, used in
1632  *                             vdo_prepare_to_grow_physical().
1633  * @completion: The admin completion.
1634  */
1635 static void check_may_grow_physical(struct vdo_completion *completion)
1636 {
1637 	struct vdo *vdo = completion->vdo;
1638 
1639 	assert_admin_phase_thread(vdo, __func__);
1640 
1641 	/* These checks can only be done from a vdo thread. */
1642 	if (vdo_is_read_only(vdo))
1643 		vdo_set_completion_result(completion, VDO_READ_ONLY);
1644 
1645 	if (vdo_in_recovery_mode(vdo))
1646 		vdo_set_completion_result(completion, VDO_RETRY_AFTER_REBUILD);
1647 
1648 	finish_operation_callback(completion);
1649 }
1650 
1651 static block_count_t get_partition_size(struct layout *layout, enum partition_id id)
1652 {
1653 	return vdo_get_known_partition(layout, id)->count;
1654 }
1655 
1656 /**
1657  * grow_layout() - Make the layout for growing a vdo.
1658  * @vdo: The vdo preparing to grow.
1659  * @old_size: The current size of the vdo.
1660  * @new_size: The size to which the vdo will be grown.
1661  *
1662  * Return: VDO_SUCCESS or an error code.
1663  */
1664 static int grow_layout(struct vdo *vdo, block_count_t old_size, block_count_t new_size)
1665 {
1666 	int result;
1667 	block_count_t min_new_size;
1668 
1669 	if (vdo->next_layout.size == new_size) {
1670 		/* We are already prepared to grow to the new size, so we're done. */
1671 		return VDO_SUCCESS;
1672 	}
1673 
1674 	/* Make a copy completion if there isn't one */
1675 	if (vdo->partition_copier == NULL) {
1676 		vdo->partition_copier = dm_kcopyd_client_create(NULL);
1677 		if (IS_ERR(vdo->partition_copier)) {
1678 			result = PTR_ERR(vdo->partition_copier);
1679 			vdo->partition_copier = NULL;
1680 			return result;
1681 		}
1682 	}
1683 
1684 	/* Free any unused preparation. */
1685 	vdo_uninitialize_layout(&vdo->next_layout);
1686 
1687 	/*
1688 	 * Make a new layout with the existing partition sizes for everything but the slab depot
1689 	 * partition.
1690 	 */
1691 	result = vdo_initialize_layout(new_size, vdo->layout.start,
1692 				       get_partition_size(&vdo->layout,
1693 							  VDO_BLOCK_MAP_PARTITION),
1694 				       get_partition_size(&vdo->layout,
1695 							  VDO_RECOVERY_JOURNAL_PARTITION),
1696 				       get_partition_size(&vdo->layout,
1697 							  VDO_SLAB_SUMMARY_PARTITION),
1698 				       &vdo->next_layout);
1699 	if (result != VDO_SUCCESS) {
1700 		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
1701 		return result;
1702 	}
1703 
1704 	/* Ensure the new journal and summary are entirely within the added blocks. */
1705 	min_new_size = (old_size +
1706 			get_partition_size(&vdo->next_layout,
1707 					   VDO_SLAB_SUMMARY_PARTITION) +
1708 			get_partition_size(&vdo->next_layout,
1709 					   VDO_RECOVERY_JOURNAL_PARTITION));
1710 	if (min_new_size > new_size) {
1711 		/* Copying the journal and summary would destroy some old metadata. */
1712 		vdo_uninitialize_layout(&vdo->next_layout);
1713 		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
1714 		return VDO_INCREMENT_TOO_SMALL;
1715 	}
1716 
1717 	return VDO_SUCCESS;
1718 }
1719 
1720 static int prepare_to_grow_physical(struct vdo *vdo, block_count_t new_physical_blocks)
1721 {
1722 	int result;
1723 	block_count_t current_physical_blocks = vdo->states.vdo.config.physical_blocks;
1724 
1725 	vdo_log_info("Preparing to resize physical to %llu",
1726 		     (unsigned long long) new_physical_blocks);
1727 	VDO_ASSERT_LOG_ONLY((new_physical_blocks > current_physical_blocks),
1728 			    "New physical size is larger than current physical size");
1729 	result = perform_admin_operation(vdo, PREPARE_GROW_PHYSICAL_PHASE_START,
1730 					 check_may_grow_physical,
1731 					 finish_operation_callback,
1732 					 "prepare grow-physical");
1733 	if (result != VDO_SUCCESS)
1734 		return result;
1735 
1736 	result = grow_layout(vdo, current_physical_blocks, new_physical_blocks);
1737 	if (result != VDO_SUCCESS)
1738 		return result;
1739 
1740 	result = vdo_prepare_to_grow_slab_depot(vdo->depot,
1741 						vdo_get_known_partition(&vdo->next_layout,
1742 									VDO_SLAB_DEPOT_PARTITION));
1743 	if (result != VDO_SUCCESS) {
1744 		vdo_uninitialize_layout(&vdo->next_layout);
1745 		return result;
1746 	}
1747 
1748 	vdo_log_info("Done preparing to resize physical");
1749 	return VDO_SUCCESS;
1750 }
1751 
1752 /**
1753  * validate_new_device_config() - Check whether a new device config represents a valid modification
1754  *				  to an existing config.
1755  * @to_validate: The new config to validate.
1756  * @config: The existing config.
1757  * @may_grow: Set to true if growing the logical and physical size of the vdo is currently
1758  *	      permitted.
1759  * @error_ptr: A pointer to hold the reason for any error.
1760  *
1761  * Return: VDO_SUCCESS or an error.
1762  */
1763 static int validate_new_device_config(struct device_config *to_validate,
1764 				      struct device_config *config, bool may_grow,
1765 				      char **error_ptr)
1766 {
1767 	if (to_validate->owning_target->begin != config->owning_target->begin) {
1768 		*error_ptr = "Starting sector cannot change";
1769 		return VDO_PARAMETER_MISMATCH;
1770 	}
1771 
1772 	if (to_validate->logical_block_size != config->logical_block_size) {
1773 		*error_ptr = "Logical block size cannot change";
1774 		return VDO_PARAMETER_MISMATCH;
1775 	}
1776 
1777 	if (to_validate->logical_blocks < config->logical_blocks) {
1778 		*error_ptr = "Can't shrink VDO logical size";
1779 		return VDO_PARAMETER_MISMATCH;
1780 	}
1781 
1782 	if (to_validate->cache_size != config->cache_size) {
1783 		*error_ptr = "Block map cache size cannot change";
1784 		return VDO_PARAMETER_MISMATCH;
1785 	}
1786 
1787 	if (to_validate->block_map_maximum_age != config->block_map_maximum_age) {
1788 		*error_ptr = "Block map maximum age cannot change";
1789 		return VDO_PARAMETER_MISMATCH;
1790 	}
1791 
1792 	if (memcmp(&to_validate->thread_counts, &config->thread_counts,
1793 		   sizeof(struct thread_count_config)) != 0) {
1794 		*error_ptr = "Thread configuration cannot change";
1795 		return VDO_PARAMETER_MISMATCH;
1796 	}
1797 
1798 	if (to_validate->physical_blocks < config->physical_blocks) {
1799 		*error_ptr = "Removing physical storage from a VDO is not supported";
1800 		return VDO_NOT_IMPLEMENTED;
1801 	}
1802 
1803 	if (!may_grow && (to_validate->physical_blocks > config->physical_blocks)) {
1804 		*error_ptr = "VDO physical size may not grow in current state";
1805 		return VDO_NOT_IMPLEMENTED;
1806 	}
1807 
1808 	return VDO_SUCCESS;
1809 }
1810 
1811 static int prepare_to_modify(struct dm_target *ti, struct device_config *config,
1812 			     struct vdo *vdo)
1813 {
1814 	int result;
1815 	bool may_grow = (vdo_get_admin_state(vdo) != VDO_ADMIN_STATE_PRE_LOADED);
1816 
1817 	result = validate_new_device_config(config, vdo->device_config, may_grow,
1818 					    &ti->error);
1819 	if (result != VDO_SUCCESS)
1820 		return -EINVAL;
1821 
1822 	if (config->logical_blocks > vdo->device_config->logical_blocks) {
1823 		block_count_t logical_blocks = vdo->states.vdo.config.logical_blocks;
1824 
1825 		vdo_log_info("Preparing to resize logical to %llu",
1826 			     (unsigned long long) config->logical_blocks);
1827 		VDO_ASSERT_LOG_ONLY((config->logical_blocks > logical_blocks),
1828 				    "New logical size is larger than current size");
1829 
1830 		result = vdo_prepare_to_grow_block_map(vdo->block_map,
1831 						       config->logical_blocks);
1832 		if (result != VDO_SUCCESS) {
1833 			ti->error = "Device vdo_prepare_to_grow_logical failed";
1834 			return result;
1835 		}
1836 
1837 		vdo_log_info("Done preparing to resize logical");
1838 	}
1839 
1840 	if (config->physical_blocks > vdo->device_config->physical_blocks) {
1841 		result = prepare_to_grow_physical(vdo, config->physical_blocks);
1842 		if (result != VDO_SUCCESS) {
1843 			if (result == VDO_PARAMETER_MISMATCH) {
1844 				/*
1845 				 * If we don't trap this case, vdo_status_to_errno() will remap
1846 				 * it to -EIO, which is misleading and ahistorical.
1847 				 */
1848 				result = -EINVAL;
1849 			}
1850 
1851 			if (result == VDO_TOO_MANY_SLABS)
1852 				ti->error = "Device vdo_prepare_to_grow_physical failed (specified physical size too big based on formatted slab size)";
1853 			else
1854 				ti->error = "Device vdo_prepare_to_grow_physical failed";
1855 
1856 			return result;
1857 		}
1858 	}
1859 
1860 	if (strcmp(config->parent_device_name, vdo->device_config->parent_device_name) != 0) {
1861 		const char *device_name = vdo_get_device_name(config->owning_target);
1862 
1863 		vdo_log_info("Updating backing device of %s from %s to %s", device_name,
1864 			     vdo->device_config->parent_device_name,
1865 			     config->parent_device_name);
1866 	}
1867 
1868 	return VDO_SUCCESS;
1869 }
1870 
1871 static int update_existing_vdo(const char *device_name, struct dm_target *ti,
1872 			       unsigned int argc, char **argv, struct vdo *vdo)
1873 {
1874 	int result;
1875 	struct device_config *config;
1876 
1877 	result = parse_device_config(argc, argv, ti, &config);
1878 	if (result != VDO_SUCCESS)
1879 		return -EINVAL;
1880 
1881 	vdo_log_info("preparing to modify device '%s'", device_name);
1882 	result = prepare_to_modify(ti, config, vdo);
1883 	if (result != VDO_SUCCESS) {
1884 		free_device_config(config);
1885 		return vdo_status_to_errno(result);
1886 	}
1887 
1888 	set_device_config(ti, vdo, config);
1889 	return VDO_SUCCESS;
1890 }
1891 
1892 static int vdo_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1893 {
1894 	int result;
1895 	struct registered_thread allocating_thread, instance_thread;
1896 	const char *device_name;
1897 	struct vdo *vdo;
1898 
1899 	vdo_register_allocating_thread(&allocating_thread, NULL);
1900 	device_name = vdo_get_device_name(ti);
1901 	vdo = vdo_find_matching(vdo_is_named, device_name);
1902 	if (vdo == NULL) {
1903 		result = construct_new_vdo(ti, argc, argv);
1904 	} else {
1905 		vdo_register_thread_device_id(&instance_thread, &vdo->instance);
1906 		result = update_existing_vdo(device_name, ti, argc, argv, vdo);
1907 		vdo_unregister_thread_device_id();
1908 	}
1909 
1910 	vdo_unregister_allocating_thread();
1911 	return result;
1912 }
1913 
1914 static void vdo_dtr(struct dm_target *ti)
1915 {
1916 	struct device_config *config = ti->private;
1917 	struct vdo *vdo = vdo_forget(config->vdo);
1918 
1919 	list_del_init(&config->config_list);
1920 	if (list_empty(&vdo->device_config_list)) {
1921 		const char *device_name;
1922 
1923 		/* This was the last config referencing the VDO. Free it. */
1924 		unsigned int instance = vdo->instance;
1925 		struct registered_thread allocating_thread, instance_thread;
1926 
1927 		vdo_register_thread_device_id(&instance_thread, &instance);
1928 		vdo_register_allocating_thread(&allocating_thread, NULL);
1929 
1930 		device_name = vdo_get_device_name(ti);
1931 		vdo_log_info("stopping device '%s'", device_name);
1932 		if (vdo->dump_on_shutdown)
1933 			vdo_dump_all(vdo, "device shutdown");
1934 
1935 		vdo_destroy(vdo_forget(vdo));
1936 		vdo_log_info("device '%s' stopped", device_name);
1937 		vdo_unregister_thread_device_id();
1938 		vdo_unregister_allocating_thread();
1939 		release_instance(instance);
1940 	} else if (config == vdo->device_config) {
1941 		/*
1942 		 * The VDO still references this config. Give it a reference to a config that isn't
1943 		 * being destroyed.
1944 		 */
1945 		vdo->device_config = list_first_entry(&vdo->device_config_list,
1946 						      struct device_config, config_list);
1947 	}
1948 
1949 	free_device_config(config);
1950 	ti->private = NULL;
1951 }
1952 
1953 static void vdo_presuspend(struct dm_target *ti)
1954 {
1955 	get_vdo_for_target(ti)->suspend_type =
1956 		(dm_noflush_suspending(ti) ? VDO_ADMIN_STATE_SUSPENDING : VDO_ADMIN_STATE_SAVING);
1957 }
1958 
1959 /**
1960  * write_super_block_for_suspend() - Update the VDO state and save the super block.
1961  * @completion: The admin completion
1962  */
1963 static void write_super_block_for_suspend(struct vdo_completion *completion)
1964 {
1965 	struct vdo *vdo = completion->vdo;
1966 
1967 	switch (vdo_get_state(vdo)) {
1968 	case VDO_DIRTY:
1969 	case VDO_NEW:
1970 		vdo_set_state(vdo, VDO_CLEAN);
1971 		break;
1972 
1973 	case VDO_CLEAN:
1974 	case VDO_READ_ONLY_MODE:
1975 	case VDO_FORCE_REBUILD:
1976 	case VDO_RECOVERING:
1977 	case VDO_REBUILD_FOR_UPGRADE:
1978 		break;
1979 
1980 	case VDO_REPLAYING:
1981 	default:
1982 		vdo_continue_completion(completion, UDS_BAD_STATE);
1983 		return;
1984 	}
1985 
1986 	vdo_save_components(vdo, completion);
1987 }
1988 
1989 /**
1990  * suspend_callback() - Callback to initiate a suspend, registered in vdo_postsuspend().
1991  * @completion: The sub-task completion.
1992  */
1993 static void suspend_callback(struct vdo_completion *completion)
1994 {
1995 	struct vdo *vdo = completion->vdo;
1996 	struct admin_state *state = &vdo->admin.state;
1997 	int result;
1998 
1999 	assert_admin_phase_thread(vdo, __func__);
2000 
2001 	switch (advance_phase(vdo)) {
2002 	case SUSPEND_PHASE_START:
2003 		if (vdo_get_admin_state_code(state)->quiescent) {
2004 			/* Already suspended */
2005 			break;
2006 		}
2007 
2008 		vdo_continue_completion(completion,
2009 					vdo_start_operation(state, vdo->suspend_type));
2010 		return;
2011 
2012 	case SUSPEND_PHASE_PACKER:
2013 		/*
2014 		 * If the VDO was already resumed from a prior suspend while read-only, some of the
2015 		 * components may not have been resumed. By setting a read-only error here, we
2016 		 * guarantee that the result of this suspend will be VDO_READ_ONLY and not
2017 		 * VDO_INVALID_ADMIN_STATE in that case.
2018 		 */
2019 		if (vdo_in_read_only_mode(vdo))
2020 			vdo_set_completion_result(completion, VDO_READ_ONLY);
2021 
2022 		vdo_drain_packer(vdo->packer, completion);
2023 		return;
2024 
2025 	case SUSPEND_PHASE_DATA_VIOS:
2026 		drain_data_vio_pool(vdo->data_vio_pool, completion);
2027 		return;
2028 
2029 	case SUSPEND_PHASE_DEDUPE:
2030 		vdo_drain_hash_zones(vdo->hash_zones, completion);
2031 		return;
2032 
2033 	case SUSPEND_PHASE_FLUSHES:
2034 		vdo_drain_flusher(vdo->flusher, completion);
2035 		return;
2036 
2037 	case SUSPEND_PHASE_LOGICAL_ZONES:
2038 		/*
2039 		 * Attempt to flush all I/O before completing post suspend work. We believe a
2040 		 * suspended device is expected to have persisted all data written before the
2041 		 * suspend, even if it hasn't been flushed yet.
2042 		 */
2043 		result = vdo_synchronous_flush(vdo);
2044 		if (result != VDO_SUCCESS)
2045 			vdo_enter_read_only_mode(vdo, result);
2046 
2047 		vdo_drain_logical_zones(vdo->logical_zones,
2048 					vdo_get_admin_state_code(state), completion);
2049 		return;
2050 
2051 	case SUSPEND_PHASE_BLOCK_MAP:
2052 		vdo_drain_block_map(vdo->block_map, vdo_get_admin_state_code(state),
2053 				    completion);
2054 		return;
2055 
2056 	case SUSPEND_PHASE_JOURNAL:
2057 		vdo_drain_recovery_journal(vdo->recovery_journal,
2058 					   vdo_get_admin_state_code(state), completion);
2059 		return;
2060 
2061 	case SUSPEND_PHASE_DEPOT:
2062 		vdo_drain_slab_depot(vdo->depot, vdo_get_admin_state_code(state),
2063 				     completion);
2064 		return;
2065 
2066 	case SUSPEND_PHASE_READ_ONLY_WAIT:
2067 		vdo_wait_until_not_entering_read_only_mode(completion);
2068 		return;
2069 
2070 	case SUSPEND_PHASE_WRITE_SUPER_BLOCK:
2071 		if (vdo_is_state_suspending(state) || (completion->result != VDO_SUCCESS)) {
2072 			/* If we didn't save the VDO or there was an error, we're done. */
2073 			break;
2074 		}
2075 
2076 		write_super_block_for_suspend(completion);
2077 		return;
2078 
2079 	case SUSPEND_PHASE_END:
2080 		break;
2081 
2082 	default:
2083 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2084 	}
2085 
2086 	finish_operation_callback(completion);
2087 }
2088 
2089 static void vdo_postsuspend(struct dm_target *ti)
2090 {
2091 	struct vdo *vdo = get_vdo_for_target(ti);
2092 	struct registered_thread instance_thread;
2093 	const char *device_name;
2094 	int result;
2095 
2096 	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
2097 	device_name = vdo_get_device_name(vdo->device_config->owning_target);
2098 	vdo_log_info("suspending device '%s'", device_name);
2099 
2100 	/*
2101 	 * It's important to note any error here does not actually stop device-mapper from
2102 	 * suspending the device. All this work is done post suspend.
2103 	 */
2104 	result = perform_admin_operation(vdo, SUSPEND_PHASE_START, suspend_callback,
2105 					 suspend_callback, "suspend");
2106 
2107 	if ((result == VDO_SUCCESS) || (result == VDO_READ_ONLY)) {
2108 		/*
2109 		 * Treat VDO_READ_ONLY as a success since a read-only suspension still leaves the
2110 		 * VDO suspended.
2111 		 */
2112 		vdo_log_info("device '%s' suspended", device_name);
2113 	} else if (result == VDO_INVALID_ADMIN_STATE) {
2114 		vdo_log_error("Suspend invoked while in unexpected state: %s",
2115 			      vdo_get_admin_state(vdo)->name);
2116 	} else {
2117 		vdo_log_error_strerror(result, "Suspend of device '%s' failed",
2118 				       device_name);
2119 	}
2120 
2121 	vdo_unregister_thread_device_id();
2122 }
2123 
2124 /**
2125  * was_new() - Check whether the vdo was new when it was loaded.
2126  * @vdo: The vdo to query.
2127  *
2128  * Return: true if the vdo was new.
2129  */
2130 static bool was_new(const struct vdo *vdo)
2131 {
2132 	return (vdo->load_state == VDO_NEW);
2133 }
2134 
2135 /**
2136  * requires_repair() - Check whether a vdo requires recovery or rebuild.
2137  * @vdo: The vdo to query.
2138  *
2139  * Return: true if the vdo must be repaired.
2140  */
2141 static bool __must_check requires_repair(const struct vdo *vdo)
2142 {
2143 	switch (vdo_get_state(vdo)) {
2144 	case VDO_DIRTY:
2145 	case VDO_FORCE_REBUILD:
2146 	case VDO_REPLAYING:
2147 	case VDO_REBUILD_FOR_UPGRADE:
2148 		return true;
2149 
2150 	default:
2151 		return false;
2152 	}
2153 }
2154 
2155 /**
2156  * get_load_type() - Determine how the slab depot was loaded.
2157  * @vdo: The vdo.
2158  *
2159  * Return: How the depot was loaded.
2160  */
2161 static enum slab_depot_load_type get_load_type(struct vdo *vdo)
2162 {
2163 	if (vdo_state_requires_read_only_rebuild(vdo->load_state))
2164 		return VDO_SLAB_DEPOT_REBUILD_LOAD;
2165 
2166 	if (vdo_state_requires_recovery(vdo->load_state))
2167 		return VDO_SLAB_DEPOT_RECOVERY_LOAD;
2168 
2169 	return VDO_SLAB_DEPOT_NORMAL_LOAD;
2170 }
2171 
2172 /**
2173  * load_callback() - Callback to do the destructive parts of loading a VDO.
2174  * @completion: The sub-task completion.
2175  */
2176 static void load_callback(struct vdo_completion *completion)
2177 {
2178 	struct vdo *vdo = completion->vdo;
2179 	int result;
2180 
2181 	assert_admin_phase_thread(vdo, __func__);
2182 
2183 	switch (advance_phase(vdo)) {
2184 	case LOAD_PHASE_START:
2185 		result = vdo_start_operation(&vdo->admin.state, VDO_ADMIN_STATE_LOADING);
2186 		if (result != VDO_SUCCESS) {
2187 			vdo_continue_completion(completion, result);
2188 			return;
2189 		}
2190 
2191 		/* Prepare the recovery journal for new entries. */
2192 		vdo_open_recovery_journal(vdo->recovery_journal, vdo->depot,
2193 					  vdo->block_map);
2194 		vdo_allow_read_only_mode_entry(completion);
2195 		return;
2196 
2197 	case LOAD_PHASE_LOAD_DEPOT:
2198 		vdo_set_dedupe_state_normal(vdo->hash_zones);
2199 		if (vdo_is_read_only(vdo)) {
2200 			/*
2201 			 * In read-only mode we don't use the allocator and it may not even be
2202 			 * readable, so don't bother trying to load it.
2203 			 */
2204 			vdo_set_completion_result(completion, VDO_READ_ONLY);
2205 			break;
2206 		}
2207 
2208 		if (requires_repair(vdo)) {
2209 			vdo_repair(completion);
2210 			return;
2211 		}
2212 
2213 		vdo_load_slab_depot(vdo->depot,
2214 				    (was_new(vdo) ? VDO_ADMIN_STATE_FORMATTING :
2215 				     VDO_ADMIN_STATE_LOADING),
2216 				    completion, NULL);
2217 		return;
2218 
2219 	case LOAD_PHASE_MAKE_DIRTY:
2220 		vdo_set_state(vdo, VDO_DIRTY);
2221 		vdo_save_components(vdo, completion);
2222 		return;
2223 
2224 	case LOAD_PHASE_PREPARE_TO_ALLOCATE:
2225 		vdo_initialize_block_map_from_journal(vdo->block_map,
2226 						      vdo->recovery_journal);
2227 		vdo_prepare_slab_depot_to_allocate(vdo->depot, get_load_type(vdo),
2228 						   completion);
2229 		return;
2230 
2231 	case LOAD_PHASE_SCRUB_SLABS:
2232 		if (vdo_state_requires_recovery(vdo->load_state))
2233 			vdo_enter_recovery_mode(vdo);
2234 
2235 		vdo_scrub_all_unrecovered_slabs(vdo->depot, completion);
2236 		return;
2237 
2238 	case LOAD_PHASE_DATA_REDUCTION:
2239 		WRITE_ONCE(vdo->compressing, vdo->device_config->compression);
2240 		if (vdo->device_config->deduplication) {
2241 			/*
2242 			 * Don't try to load or rebuild the index first (and log scary error
2243 			 * messages) if this is known to be a newly-formatted volume.
2244 			 */
2245 			vdo_start_dedupe_index(vdo->hash_zones, was_new(vdo));
2246 		}
2247 
2248 		vdo->allocations_allowed = false;
2249 		fallthrough;
2250 
2251 	case LOAD_PHASE_FINISHED:
2252 		break;
2253 
2254 	case LOAD_PHASE_DRAIN_JOURNAL:
2255 		vdo_drain_recovery_journal(vdo->recovery_journal, VDO_ADMIN_STATE_SAVING,
2256 					   completion);
2257 		return;
2258 
2259 	case LOAD_PHASE_WAIT_FOR_READ_ONLY:
2260 		/* Avoid an infinite loop */
2261 		completion->error_handler = NULL;
2262 		vdo->admin.phase = LOAD_PHASE_FINISHED;
2263 		vdo_wait_until_not_entering_read_only_mode(completion);
2264 		return;
2265 
2266 	default:
2267 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2268 	}
2269 
2270 	finish_operation_callback(completion);
2271 }
2272 
2273 /**
2274  * handle_load_error() - Handle an error during the load operation.
2275  * @completion: The admin completion.
2276  *
2277  * If at all possible, brings the vdo online in read-only mode. This handler is registered in
2278  * vdo_preresume_registered().
2279  */
2280 static void handle_load_error(struct vdo_completion *completion)
2281 {
2282 	struct vdo *vdo = completion->vdo;
2283 
2284 	if (vdo_requeue_completion_if_needed(completion,
2285 					     vdo->thread_config.admin_thread))
2286 		return;
2287 
2288 	if (vdo_state_requires_read_only_rebuild(vdo->load_state) &&
2289 	    (vdo->admin.phase == LOAD_PHASE_MAKE_DIRTY)) {
2290 		vdo_log_error_strerror(completion->result, "aborting load");
2291 		vdo->admin.phase = LOAD_PHASE_DRAIN_JOURNAL;
2292 		load_callback(vdo_forget(completion));
2293 		return;
2294 	}
2295 
2296 	vdo_log_error_strerror(completion->result,
2297 			       "Entering read-only mode due to load error");
2298 	vdo->admin.phase = LOAD_PHASE_WAIT_FOR_READ_ONLY;
2299 	vdo_enter_read_only_mode(vdo, completion->result);
2300 	completion->result = VDO_READ_ONLY;
2301 	load_callback(completion);
2302 }
2303 
2304 /**
2305  * write_super_block_for_resume() - Update the VDO state and save the super block.
2306  * @completion: The admin completion
2307  */
2308 static void write_super_block_for_resume(struct vdo_completion *completion)
2309 {
2310 	struct vdo *vdo = completion->vdo;
2311 
2312 	switch (vdo_get_state(vdo)) {
2313 	case VDO_CLEAN:
2314 	case VDO_NEW:
2315 		vdo_set_state(vdo, VDO_DIRTY);
2316 		vdo_save_components(vdo, completion);
2317 		return;
2318 
2319 	case VDO_DIRTY:
2320 	case VDO_READ_ONLY_MODE:
2321 	case VDO_FORCE_REBUILD:
2322 	case VDO_RECOVERING:
2323 	case VDO_REBUILD_FOR_UPGRADE:
2324 		/* No need to write the super block in these cases */
2325 		vdo_launch_completion(completion);
2326 		return;
2327 
2328 	case VDO_REPLAYING:
2329 	default:
2330 		vdo_continue_completion(completion, UDS_BAD_STATE);
2331 	}
2332 }
2333 
2334 /**
2335  * resume_callback() - Callback to resume a VDO.
2336  * @completion: The admin completion.
2337  */
2338 static void resume_callback(struct vdo_completion *completion)
2339 {
2340 	struct vdo *vdo = completion->vdo;
2341 	int result;
2342 
2343 	assert_admin_phase_thread(vdo, __func__);
2344 
2345 	switch (advance_phase(vdo)) {
2346 	case RESUME_PHASE_START:
2347 		result = vdo_start_operation(&vdo->admin.state,
2348 					     VDO_ADMIN_STATE_RESUMING);
2349 		if (result != VDO_SUCCESS) {
2350 			vdo_continue_completion(completion, result);
2351 			return;
2352 		}
2353 
2354 		write_super_block_for_resume(completion);
2355 		return;
2356 
2357 	case RESUME_PHASE_ALLOW_READ_ONLY_MODE:
2358 		vdo_allow_read_only_mode_entry(completion);
2359 		return;
2360 
2361 	case RESUME_PHASE_DEDUPE:
2362 		vdo_resume_hash_zones(vdo->hash_zones, completion);
2363 		return;
2364 
2365 	case RESUME_PHASE_DEPOT:
2366 		vdo_resume_slab_depot(vdo->depot, completion);
2367 		return;
2368 
2369 	case RESUME_PHASE_JOURNAL:
2370 		vdo_resume_recovery_journal(vdo->recovery_journal, completion);
2371 		return;
2372 
2373 	case RESUME_PHASE_BLOCK_MAP:
2374 		vdo_resume_block_map(vdo->block_map, completion);
2375 		return;
2376 
2377 	case RESUME_PHASE_LOGICAL_ZONES:
2378 		vdo_resume_logical_zones(vdo->logical_zones, completion);
2379 		return;
2380 
2381 	case RESUME_PHASE_PACKER:
2382 	{
2383 		bool was_enabled = vdo_get_compressing(vdo);
2384 		bool enable = vdo->device_config->compression;
2385 
2386 		if (enable != was_enabled)
2387 			WRITE_ONCE(vdo->compressing, enable);
2388 		vdo_log_info("compression is %s", (enable ? "enabled" : "disabled"));
2389 
2390 		vdo_resume_packer(vdo->packer, completion);
2391 		return;
2392 	}
2393 
2394 	case RESUME_PHASE_FLUSHER:
2395 		vdo_resume_flusher(vdo->flusher, completion);
2396 		return;
2397 
2398 	case RESUME_PHASE_DATA_VIOS:
2399 		resume_data_vio_pool(vdo->data_vio_pool, completion);
2400 		return;
2401 
2402 	case RESUME_PHASE_END:
2403 		break;
2404 
2405 	default:
2406 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2407 	}
2408 
2409 	finish_operation_callback(completion);
2410 }
2411 
2412 /**
2413  * grow_logical_callback() - Callback to initiate a grow logical.
2414  * @completion: The admin completion.
2415  *
2416  * Registered in perform_grow_logical().
2417  */
2418 static void grow_logical_callback(struct vdo_completion *completion)
2419 {
2420 	struct vdo *vdo = completion->vdo;
2421 	int result;
2422 
2423 	assert_admin_phase_thread(vdo, __func__);
2424 
2425 	switch (advance_phase(vdo)) {
2426 	case GROW_LOGICAL_PHASE_START:
2427 		if (vdo_is_read_only(vdo)) {
2428 			vdo_log_error_strerror(VDO_READ_ONLY,
2429 					       "Can't grow logical size of a read-only VDO");
2430 			vdo_set_completion_result(completion, VDO_READ_ONLY);
2431 			break;
2432 		}
2433 
2434 		result = vdo_start_operation(&vdo->admin.state,
2435 					     VDO_ADMIN_STATE_SUSPENDED_OPERATION);
2436 		if (result != VDO_SUCCESS) {
2437 			vdo_continue_completion(completion, result);
2438 			return;
2439 		}
2440 
2441 		vdo->states.vdo.config.logical_blocks = vdo->block_map->next_entry_count;
2442 		vdo_save_components(vdo, completion);
2443 		return;
2444 
2445 	case GROW_LOGICAL_PHASE_GROW_BLOCK_MAP:
2446 		vdo_grow_block_map(vdo->block_map, completion);
2447 		return;
2448 
2449 	case GROW_LOGICAL_PHASE_END:
2450 		break;
2451 
2452 	case GROW_LOGICAL_PHASE_ERROR:
2453 		vdo_enter_read_only_mode(vdo, completion->result);
2454 		break;
2455 
2456 	default:
2457 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2458 	}
2459 
2460 	finish_operation_callback(completion);
2461 }
2462 
2463 /**
2464  * handle_logical_growth_error() - Handle an error during the grow physical process.
2465  * @completion: The admin completion.
2466  */
2467 static void handle_logical_growth_error(struct vdo_completion *completion)
2468 {
2469 	struct vdo *vdo = completion->vdo;
2470 
2471 	if (vdo->admin.phase == GROW_LOGICAL_PHASE_GROW_BLOCK_MAP) {
2472 		/*
2473 		 * We've failed to write the new size in the super block, so set our in memory
2474 		 * config back to the old size.
2475 		 */
2476 		vdo->states.vdo.config.logical_blocks = vdo->block_map->entry_count;
2477 		vdo_abandon_block_map_growth(vdo->block_map);
2478 	}
2479 
2480 	vdo->admin.phase = GROW_LOGICAL_PHASE_ERROR;
2481 	grow_logical_callback(completion);
2482 }
2483 
2484 /**
2485  * perform_grow_logical() - Grow the logical size of the vdo.
2486  * @vdo: The vdo to grow.
2487  * @new_logical_blocks: The size to which the vdo should be grown.
2488  *
2489  * Context: This method may only be called when the vdo has been suspended and must not be called
2490  * from a base thread.
2491  *
2492  * Return: VDO_SUCCESS or an error.
2493  */
2494 static int perform_grow_logical(struct vdo *vdo, block_count_t new_logical_blocks)
2495 {
2496 	int result;
2497 
2498 	if (vdo->device_config->logical_blocks == new_logical_blocks) {
2499 		/*
2500 		 * A table was loaded for which we prepared to grow, but a table without that
2501 		 * growth was what we are resuming with.
2502 		 */
2503 		vdo_abandon_block_map_growth(vdo->block_map);
2504 		return VDO_SUCCESS;
2505 	}
2506 
2507 	vdo_log_info("Resizing logical to %llu",
2508 		     (unsigned long long) new_logical_blocks);
2509 	if (vdo->block_map->next_entry_count != new_logical_blocks)
2510 		return VDO_PARAMETER_MISMATCH;
2511 
2512 	result = perform_admin_operation(vdo, GROW_LOGICAL_PHASE_START,
2513 					 grow_logical_callback,
2514 					 handle_logical_growth_error, "grow logical");
2515 	if (result != VDO_SUCCESS)
2516 		return result;
2517 
2518 	vdo_log_info("Logical blocks now %llu", (unsigned long long) new_logical_blocks);
2519 	return VDO_SUCCESS;
2520 }
2521 
2522 static void copy_callback(int read_err, unsigned long write_err, void *context)
2523 {
2524 	struct vdo_completion *completion = context;
2525 	int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
2526 
2527 	vdo_continue_completion(completion, result);
2528 }
2529 
2530 static void partition_to_region(struct partition *partition, struct vdo *vdo,
2531 				struct dm_io_region *region)
2532 {
2533 	physical_block_number_t pbn = partition->offset - vdo->geometry.bio_offset;
2534 
2535 	*region = (struct dm_io_region) {
2536 		.bdev = vdo_get_backing_device(vdo),
2537 		.sector = pbn * VDO_SECTORS_PER_BLOCK,
2538 		.count = partition->count * VDO_SECTORS_PER_BLOCK,
2539 	};
2540 }
2541 
2542 /**
2543  * copy_partition() - Copy a partition from the location specified in the current layout to that in
2544  *                    the next layout.
2545  * @vdo: The vdo preparing to grow.
2546  * @id: The ID of the partition to copy.
2547  * @parent: The completion to notify when the copy is complete.
2548  */
2549 static void copy_partition(struct vdo *vdo, enum partition_id id,
2550 			   struct vdo_completion *parent)
2551 {
2552 	struct dm_io_region read_region, write_regions[1];
2553 	struct partition *from = vdo_get_known_partition(&vdo->layout, id);
2554 	struct partition *to = vdo_get_known_partition(&vdo->next_layout, id);
2555 
2556 	partition_to_region(from, vdo, &read_region);
2557 	partition_to_region(to, vdo, &write_regions[0]);
2558 	dm_kcopyd_copy(vdo->partition_copier, &read_region, 1, write_regions, 0,
2559 		       copy_callback, parent);
2560 }
2561 
2562 /**
2563  * grow_physical_callback() - Callback to initiate a grow physical.
2564  * @completion: The admin completion.
2565  *
2566  * Registered in perform_grow_physical().
2567  */
2568 static void grow_physical_callback(struct vdo_completion *completion)
2569 {
2570 	struct vdo *vdo = completion->vdo;
2571 	int result;
2572 
2573 	assert_admin_phase_thread(vdo, __func__);
2574 
2575 	switch (advance_phase(vdo)) {
2576 	case GROW_PHYSICAL_PHASE_START:
2577 		if (vdo_is_read_only(vdo)) {
2578 			vdo_log_error_strerror(VDO_READ_ONLY,
2579 					       "Can't grow physical size of a read-only VDO");
2580 			vdo_set_completion_result(completion, VDO_READ_ONLY);
2581 			break;
2582 		}
2583 
2584 		result = vdo_start_operation(&vdo->admin.state,
2585 					     VDO_ADMIN_STATE_SUSPENDED_OPERATION);
2586 		if (result != VDO_SUCCESS) {
2587 			vdo_continue_completion(completion, result);
2588 			return;
2589 		}
2590 
2591 		/* Copy the journal into the new layout. */
2592 		copy_partition(vdo, VDO_RECOVERY_JOURNAL_PARTITION, completion);
2593 		return;
2594 
2595 	case GROW_PHYSICAL_PHASE_COPY_SUMMARY:
2596 		copy_partition(vdo, VDO_SLAB_SUMMARY_PARTITION, completion);
2597 		return;
2598 
2599 	case GROW_PHYSICAL_PHASE_UPDATE_COMPONENTS:
2600 		vdo_uninitialize_layout(&vdo->layout);
2601 		vdo->layout = vdo->next_layout;
2602 		vdo_forget(vdo->next_layout.head);
2603 		vdo->states.vdo.config.physical_blocks = vdo->layout.size;
2604 		vdo_update_slab_depot_size(vdo->depot);
2605 		vdo_save_components(vdo, completion);
2606 		return;
2607 
2608 	case GROW_PHYSICAL_PHASE_USE_NEW_SLABS:
2609 		vdo_use_new_slabs(vdo->depot, completion);
2610 		return;
2611 
2612 	case GROW_PHYSICAL_PHASE_END:
2613 		vdo->depot->summary_origin =
2614 			vdo_get_known_partition(&vdo->layout,
2615 						VDO_SLAB_SUMMARY_PARTITION)->offset;
2616 		vdo->recovery_journal->origin =
2617 			vdo_get_known_partition(&vdo->layout,
2618 						VDO_RECOVERY_JOURNAL_PARTITION)->offset;
2619 		break;
2620 
2621 	case GROW_PHYSICAL_PHASE_ERROR:
2622 		vdo_enter_read_only_mode(vdo, completion->result);
2623 		break;
2624 
2625 	default:
2626 		vdo_set_completion_result(completion, UDS_BAD_STATE);
2627 	}
2628 
2629 	vdo_uninitialize_layout(&vdo->next_layout);
2630 	finish_operation_callback(completion);
2631 }
2632 
2633 /**
2634  * handle_physical_growth_error() - Handle an error during the grow physical process.
2635  * @completion: The sub-task completion.
2636  */
2637 static void handle_physical_growth_error(struct vdo_completion *completion)
2638 {
2639 	completion->vdo->admin.phase = GROW_PHYSICAL_PHASE_ERROR;
2640 	grow_physical_callback(completion);
2641 }
2642 
2643 /**
2644  * perform_grow_physical() - Grow the physical size of the vdo.
2645  * @vdo: The vdo to resize.
2646  * @new_physical_blocks: The new physical size in blocks.
2647  *
2648  * Context: This method may only be called when the vdo has been suspended and must not be called
2649  * from a base thread.
2650  *
2651  * Return: VDO_SUCCESS or an error.
2652  */
2653 static int perform_grow_physical(struct vdo *vdo, block_count_t new_physical_blocks)
2654 {
2655 	int result;
2656 	block_count_t new_depot_size, prepared_depot_size;
2657 	block_count_t old_physical_blocks = vdo->states.vdo.config.physical_blocks;
2658 
2659 	/* Skip any noop grows. */
2660 	if (old_physical_blocks == new_physical_blocks)
2661 		return VDO_SUCCESS;
2662 
2663 	if (new_physical_blocks != vdo->next_layout.size) {
2664 		/*
2665 		 * Either the VDO isn't prepared to grow, or it was prepared to grow to a different
2666 		 * size. Doing this check here relies on the fact that the call to this method is
2667 		 * done under the dmsetup message lock.
2668 		 */
2669 		vdo_uninitialize_layout(&vdo->next_layout);
2670 		vdo_abandon_new_slabs(vdo->depot);
2671 		return VDO_PARAMETER_MISMATCH;
2672 	}
2673 
2674 	/* Validate that we are prepared to grow appropriately. */
2675 	new_depot_size =
2676 		vdo_get_known_partition(&vdo->next_layout, VDO_SLAB_DEPOT_PARTITION)->count;
2677 	prepared_depot_size = (vdo->depot->new_slabs == NULL) ? 0 : vdo->depot->new_size;
2678 	if (prepared_depot_size != new_depot_size)
2679 		return VDO_PARAMETER_MISMATCH;
2680 
2681 	result = perform_admin_operation(vdo, GROW_PHYSICAL_PHASE_START,
2682 					 grow_physical_callback,
2683 					 handle_physical_growth_error, "grow physical");
2684 	if (result != VDO_SUCCESS)
2685 		return result;
2686 
2687 	vdo_log_info("Physical block count was %llu, now %llu",
2688 		     (unsigned long long) old_physical_blocks,
2689 		     (unsigned long long) new_physical_blocks);
2690 	return VDO_SUCCESS;
2691 }
2692 
2693 /**
2694  * apply_new_vdo_configuration() - Attempt to make any configuration changes from the table being
2695  *                                 resumed.
2696  * @vdo: The vdo being resumed.
2697  * @config: The new device configuration derived from the table with which the vdo is being
2698  *          resumed.
2699  *
2700  * Return: VDO_SUCCESS or an error.
2701  */
2702 static int __must_check apply_new_vdo_configuration(struct vdo *vdo,
2703 						    struct device_config *config)
2704 {
2705 	int result;
2706 
2707 	result = perform_grow_logical(vdo, config->logical_blocks);
2708 	if (result != VDO_SUCCESS) {
2709 		vdo_log_error("grow logical operation failed, result = %d", result);
2710 		return result;
2711 	}
2712 
2713 	result = perform_grow_physical(vdo, config->physical_blocks);
2714 	if (result != VDO_SUCCESS)
2715 		vdo_log_error("resize operation failed, result = %d", result);
2716 
2717 	return result;
2718 }
2719 
2720 static int vdo_preresume_registered(struct dm_target *ti, struct vdo *vdo)
2721 {
2722 	struct device_config *config = ti->private;
2723 	const char *device_name = vdo_get_device_name(ti);
2724 	block_count_t backing_blocks;
2725 	int result;
2726 
2727 	backing_blocks = get_underlying_device_block_count(vdo);
2728 	if (backing_blocks < config->physical_blocks) {
2729 		/* FIXME: can this still happen? */
2730 		vdo_log_error("resume of device '%s' failed: backing device has %llu blocks but VDO physical size is %llu blocks",
2731 			      device_name, (unsigned long long) backing_blocks,
2732 			      (unsigned long long) config->physical_blocks);
2733 		return -EINVAL;
2734 	}
2735 
2736 	if (vdo_get_admin_state(vdo) == VDO_ADMIN_STATE_PRE_LOADED) {
2737 		vdo_log_info("starting device '%s'", device_name);
2738 		result = perform_admin_operation(vdo, LOAD_PHASE_START, load_callback,
2739 						 handle_load_error, "load");
2740 		if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
2741 			/*
2742 			 * Something has gone very wrong. Make sure everything has drained and
2743 			 * leave the device in an unresumable state.
2744 			 */
2745 			vdo_log_error_strerror(result,
2746 					       "Start failed, could not load VDO metadata");
2747 			vdo->suspend_type = VDO_ADMIN_STATE_STOPPING;
2748 			perform_admin_operation(vdo, SUSPEND_PHASE_START,
2749 						suspend_callback, suspend_callback,
2750 						"suspend");
2751 			return result;
2752 		}
2753 
2754 		/* Even if the VDO is read-only, it is now able to handle read requests. */
2755 		vdo_log_info("device '%s' started", device_name);
2756 	}
2757 
2758 	vdo_log_info("resuming device '%s'", device_name);
2759 
2760 	/* If this fails, the VDO was not in a state to be resumed. This should never happen. */
2761 	result = apply_new_vdo_configuration(vdo, config);
2762 	BUG_ON(result == VDO_INVALID_ADMIN_STATE);
2763 
2764 	/*
2765 	 * Now that we've tried to modify the vdo, the new config *is* the config, whether the
2766 	 * modifications worked or not.
2767 	 */
2768 	vdo->device_config = config;
2769 
2770 	/*
2771 	 * Any error here is highly unexpected and the state of the vdo is questionable, so we mark
2772 	 * it read-only in memory. Because we are suspended, the read-only state will not be
2773 	 * written to disk.
2774 	 */
2775 	if (result != VDO_SUCCESS) {
2776 		vdo_log_error_strerror(result,
2777 				       "Commit of modifications to device '%s' failed",
2778 				       device_name);
2779 		vdo_enter_read_only_mode(vdo, result);
2780 		return result;
2781 	}
2782 
2783 	if (vdo_get_admin_state(vdo)->normal) {
2784 		/* The VDO was just started, so we don't need to resume it. */
2785 		return VDO_SUCCESS;
2786 	}
2787 
2788 	result = perform_admin_operation(vdo, RESUME_PHASE_START, resume_callback,
2789 					 resume_callback, "resume");
2790 	BUG_ON(result == VDO_INVALID_ADMIN_STATE);
2791 	if (result == VDO_READ_ONLY) {
2792 		/* Even if the vdo is read-only, it has still resumed. */
2793 		result = VDO_SUCCESS;
2794 	}
2795 
2796 	if (result != VDO_SUCCESS)
2797 		vdo_log_error("resume of device '%s' failed with error: %d", device_name,
2798 			      result);
2799 
2800 	return result;
2801 }
2802 
2803 static int vdo_preresume(struct dm_target *ti)
2804 {
2805 	struct registered_thread instance_thread;
2806 	struct vdo *vdo = get_vdo_for_target(ti);
2807 	int result;
2808 
2809 	vdo_register_thread_device_id(&instance_thread, &vdo->instance);
2810 	result = vdo_preresume_registered(ti, vdo);
2811 	if ((result == VDO_PARAMETER_MISMATCH) || (result == VDO_INVALID_ADMIN_STATE))
2812 		result = -EINVAL;
2813 	vdo_unregister_thread_device_id();
2814 	return vdo_status_to_errno(result);
2815 }
2816 
2817 static void vdo_resume(struct dm_target *ti)
2818 {
2819 	struct registered_thread instance_thread;
2820 
2821 	vdo_register_thread_device_id(&instance_thread,
2822 				      &get_vdo_for_target(ti)->instance);
2823 	vdo_log_info("device '%s' resumed", vdo_get_device_name(ti));
2824 	vdo_unregister_thread_device_id();
2825 }
2826 
2827 /*
2828  * If anything changes that affects how user tools will interact with vdo, update the version
2829  * number and make sure documentation about the change is complete so tools can properly update
2830  * their management code.
2831  */
2832 static struct target_type vdo_target_bio = {
2833 	.features = DM_TARGET_SINGLETON,
2834 	.name = "vdo",
2835 	.version = { 9, 0, 0 },
2836 	.module = THIS_MODULE,
2837 	.ctr = vdo_ctr,
2838 	.dtr = vdo_dtr,
2839 	.io_hints = vdo_io_hints,
2840 	.iterate_devices = vdo_iterate_devices,
2841 	.map = vdo_map_bio,
2842 	.message = vdo_message,
2843 	.status = vdo_status,
2844 	.presuspend = vdo_presuspend,
2845 	.postsuspend = vdo_postsuspend,
2846 	.preresume = vdo_preresume,
2847 	.resume = vdo_resume,
2848 };
2849 
2850 static bool dm_registered;
2851 
2852 static void vdo_module_destroy(void)
2853 {
2854 	vdo_log_debug("unloading");
2855 
2856 	if (dm_registered)
2857 		dm_unregister_target(&vdo_target_bio);
2858 
2859 	VDO_ASSERT_LOG_ONLY(instances.count == 0,
2860 			    "should have no instance numbers still in use, but have %u",
2861 			    instances.count);
2862 	vdo_free(instances.words);
2863 	memset(&instances, 0, sizeof(struct instance_tracker));
2864 }
2865 
2866 static int __init vdo_init(void)
2867 {
2868 	int result = 0;
2869 
2870 	/* Memory tracking must be initialized first for accurate accounting. */
2871 	vdo_memory_init();
2872 	vdo_initialize_threads_mutex();
2873 	vdo_initialize_thread_device_registry();
2874 	vdo_initialize_device_registry_once();
2875 
2876 	/* Add VDO errors to the set of errors registered by the indexer. */
2877 	result = vdo_register_status_codes();
2878 	if (result != VDO_SUCCESS) {
2879 		vdo_log_error("vdo_register_status_codes failed %d", result);
2880 		vdo_module_destroy();
2881 		return result;
2882 	}
2883 
2884 	result = dm_register_target(&vdo_target_bio);
2885 	if (result < 0) {
2886 		vdo_log_error("dm_register_target failed %d", result);
2887 		vdo_module_destroy();
2888 		return result;
2889 	}
2890 	dm_registered = true;
2891 
2892 	return result;
2893 }
2894 
2895 static void __exit vdo_exit(void)
2896 {
2897 	vdo_module_destroy();
2898 	/* Memory tracking cleanup must be done last. */
2899 	vdo_memory_exit();
2900 }
2901 
2902 module_init(vdo_init);
2903 module_exit(vdo_exit);
2904 
2905 module_param_named(log_level, vdo_log_level, uint, 0644);
2906 MODULE_PARM_DESC(log_level, "Log level for log messages");
2907 
2908 MODULE_DESCRIPTION(DM_NAME " target for transparent deduplication");
2909 MODULE_AUTHOR("Red Hat, Inc.");
2910 MODULE_LICENSE("GPL");
2911