xref: /linux/drivers/md/dm-vdo/vdo.c (revision d358e5254674b70f34c847715ca509e46eb81e6f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 /*
7  * This file contains the main entry points for normal operations on a vdo as well as functions for
8  * constructing and destroying vdo instances (in memory).
9  */
10 
11 /**
12  * DOC:
13  *
14  * A read_only_notifier has a single completion which is used to perform read-only notifications,
15  * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected
16  * by a spinlock, are used to control the read-only mode entry process. The first field holds the
17  * read-only error. The second is the state field, which may hold any of the four special values
18  * enumerated here.
19  *
20  * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field
21  * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already
22  * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in
23  * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is
24  * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then
25  * notifications are currently disallowed, generally due to the vdo being suspended. In this case,
26  * the nothing more will be done until the vdo is resumed, at which point the notification will be
27  * performed. In any other case, the vdo is already read-only, and there is nothing more to do.
28  */
29 
30 #include "vdo.h"
31 
32 #include <linux/completion.h>
33 #include <linux/device-mapper.h>
34 #include <linux/lz4.h>
35 #include <linux/mutex.h>
36 #include <linux/spinlock.h>
37 #include <linux/types.h>
38 
39 #include "logger.h"
40 #include "memory-alloc.h"
41 #include "permassert.h"
42 #include "string-utils.h"
43 
44 #include "block-map.h"
45 #include "completion.h"
46 #include "data-vio.h"
47 #include "dedupe.h"
48 #include "encodings.h"
49 #include "funnel-workqueue.h"
50 #include "io-submitter.h"
51 #include "logical-zone.h"
52 #include "packer.h"
53 #include "physical-zone.h"
54 #include "recovery-journal.h"
55 #include "slab-depot.h"
56 #include "statistics.h"
57 #include "status-codes.h"
58 #include "vio.h"
59 
60 #define PARANOID_THREAD_CONSISTENCY_CHECKS 0
61 
62 struct sync_completion {
63 	struct vdo_completion vdo_completion;
64 	struct completion completion;
65 };
66 
67 /* A linked list is adequate for the small number of entries we expect. */
68 struct device_registry {
69 	struct list_head links;
70 	/* TODO: Convert to rcu per kernel recommendation. */
71 	rwlock_t lock;
72 };
73 
74 static struct device_registry registry;
75 
76 /**
77  * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device
78  *                                         registry.
79  */
vdo_initialize_device_registry_once(void)80 void vdo_initialize_device_registry_once(void)
81 {
82 	INIT_LIST_HEAD(&registry.links);
83 	rwlock_init(&registry.lock);
84 }
85 
86 /** vdo_is_equal() - Implements vdo_filter_fn. */
vdo_is_equal(struct vdo * vdo,const void * context)87 static bool vdo_is_equal(struct vdo *vdo, const void *context)
88 {
89 	return (vdo == context);
90 }
91 
92 /**
93  * filter_vdos_locked() - Find a vdo in the registry if it exists there.
94  * @filter: The filter function to apply to devices.
95  * @context: A bit of context to provide the filter.
96  *
97  * Context: Must be called holding the lock.
98  *
99  * Return: the vdo object found, if any.
100  */
filter_vdos_locked(vdo_filter_fn filter,const void * context)101 static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter,
102 						    const void *context)
103 {
104 	struct vdo *vdo;
105 
106 	list_for_each_entry(vdo, &registry.links, registration) {
107 		if (filter(vdo, context))
108 			return vdo;
109 	}
110 
111 	return NULL;
112 }
113 
114 /**
115  * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function.
116  * @filter: The filter function to apply to vdos.
117  * @context: A bit of context to provide the filter.
118  */
vdo_find_matching(vdo_filter_fn filter,const void * context)119 struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context)
120 {
121 	struct vdo *vdo;
122 
123 	read_lock(&registry.lock);
124 	vdo = filter_vdos_locked(filter, context);
125 	read_unlock(&registry.lock);
126 
127 	return vdo;
128 }
129 
start_vdo_request_queue(void * ptr)130 static void start_vdo_request_queue(void *ptr)
131 {
132 	struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
133 
134 	vdo_register_allocating_thread(&thread->allocating_thread,
135 				       &thread->vdo->allocations_allowed);
136 }
137 
finish_vdo_request_queue(void * ptr)138 static void finish_vdo_request_queue(void *ptr)
139 {
140 	vdo_unregister_allocating_thread();
141 }
142 
143 static const struct vdo_work_queue_type default_queue_type = {
144 	.start = start_vdo_request_queue,
145 	.finish = finish_vdo_request_queue,
146 	.max_priority = VDO_DEFAULT_Q_MAX_PRIORITY,
147 	.default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY,
148 };
149 
150 static const struct vdo_work_queue_type bio_ack_q_type = {
151 	.start = NULL,
152 	.finish = NULL,
153 	.max_priority = BIO_ACK_Q_MAX_PRIORITY,
154 	.default_priority = BIO_ACK_Q_ACK_PRIORITY,
155 };
156 
157 static const struct vdo_work_queue_type cpu_q_type = {
158 	.start = NULL,
159 	.finish = NULL,
160 	.max_priority = CPU_Q_MAX_PRIORITY,
161 	.default_priority = CPU_Q_MAX_PRIORITY,
162 };
163 
uninitialize_thread_config(struct thread_config * config)164 static void uninitialize_thread_config(struct thread_config *config)
165 {
166 	vdo_free(vdo_forget(config->logical_threads));
167 	vdo_free(vdo_forget(config->physical_threads));
168 	vdo_free(vdo_forget(config->hash_zone_threads));
169 	vdo_free(vdo_forget(config->bio_threads));
170 	memset(config, 0, sizeof(struct thread_config));
171 }
172 
assign_thread_ids(struct thread_config * config,thread_id_t thread_ids[],zone_count_t count)173 static void assign_thread_ids(struct thread_config *config,
174 			      thread_id_t thread_ids[], zone_count_t count)
175 {
176 	zone_count_t zone;
177 
178 	for (zone = 0; zone < count; zone++)
179 		thread_ids[zone] = config->thread_count++;
180 }
181 
182 /**
183  * initialize_thread_config() - Initialize the thread mapping
184  * @counts: The number and types of threads to create.
185  * @config: The thread_config to initialize.
186  *
187  * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all
188  * three plus the packer and recovery journal. Otherwise, there must be at least one of each type,
189  * and each will have its own thread, as will the packer and recovery journal.
190  *
191  * Return: VDO_SUCCESS or an error.
192  */
initialize_thread_config(struct thread_count_config counts,struct thread_config * config)193 static int __must_check initialize_thread_config(struct thread_count_config counts,
194 						 struct thread_config *config)
195 {
196 	int result;
197 	bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0);
198 
199 	config->bio_thread_count = counts.bio_threads;
200 	if (single) {
201 		config->logical_zone_count = 1;
202 		config->physical_zone_count = 1;
203 		config->hash_zone_count = 1;
204 	} else {
205 		config->logical_zone_count = counts.logical_zones;
206 		config->physical_zone_count = counts.physical_zones;
207 		config->hash_zone_count = counts.hash_zones;
208 	}
209 
210 	result = vdo_allocate(config->logical_zone_count, thread_id_t,
211 			      "logical thread array", &config->logical_threads);
212 	if (result != VDO_SUCCESS) {
213 		uninitialize_thread_config(config);
214 		return result;
215 	}
216 
217 	result = vdo_allocate(config->physical_zone_count, thread_id_t,
218 			      "physical thread array", &config->physical_threads);
219 	if (result != VDO_SUCCESS) {
220 		uninitialize_thread_config(config);
221 		return result;
222 	}
223 
224 	result = vdo_allocate(config->hash_zone_count, thread_id_t,
225 			      "hash thread array", &config->hash_zone_threads);
226 	if (result != VDO_SUCCESS) {
227 		uninitialize_thread_config(config);
228 		return result;
229 	}
230 
231 	result = vdo_allocate(config->bio_thread_count, thread_id_t,
232 			      "bio thread array", &config->bio_threads);
233 	if (result != VDO_SUCCESS) {
234 		uninitialize_thread_config(config);
235 		return result;
236 	}
237 
238 	if (single) {
239 		config->logical_threads[0] = config->thread_count;
240 		config->physical_threads[0] = config->thread_count;
241 		config->hash_zone_threads[0] = config->thread_count++;
242 	} else {
243 		config->admin_thread = config->thread_count;
244 		config->journal_thread = config->thread_count++;
245 		config->packer_thread = config->thread_count++;
246 		assign_thread_ids(config, config->logical_threads, counts.logical_zones);
247 		assign_thread_ids(config, config->physical_threads, counts.physical_zones);
248 		assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones);
249 	}
250 
251 	config->dedupe_thread = config->thread_count++;
252 	config->bio_ack_thread =
253 		((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID);
254 	config->cpu_thread = config->thread_count++;
255 	assign_thread_ids(config, config->bio_threads, counts.bio_threads);
256 	return VDO_SUCCESS;
257 }
258 
259 /**
260  * read_geometry_block() - Synchronously read the geometry block from a vdo's underlying block
261  *                         device.
262  * @vdo: The vdo whose geometry is to be read.
263  *
264  * Return: VDO_SUCCESS or an error code.
265  */
read_geometry_block(struct vdo * vdo)266 static int __must_check read_geometry_block(struct vdo *vdo)
267 {
268 	struct vio *vio;
269 	char *block;
270 	int result;
271 
272 	result = vdo_allocate(VDO_BLOCK_SIZE, u8, __func__, &block);
273 	if (result != VDO_SUCCESS)
274 		return result;
275 
276 	result = create_metadata_vio(vdo, VIO_TYPE_GEOMETRY, VIO_PRIORITY_HIGH, NULL,
277 				     block, &vio);
278 	if (result != VDO_SUCCESS) {
279 		vdo_free(block);
280 		return result;
281 	}
282 
283 	/*
284 	 * This is only safe because, having not already loaded the geometry, the vdo's geometry's
285 	 * bio_offset field is 0, so the fact that vio_reset_bio() will subtract that offset from
286 	 * the supplied pbn is not a problem.
287 	 */
288 	result = vio_reset_bio(vio, block, NULL, REQ_OP_READ,
289 			       VDO_GEOMETRY_BLOCK_LOCATION);
290 	if (result != VDO_SUCCESS) {
291 		free_vio(vdo_forget(vio));
292 		vdo_free(block);
293 		return result;
294 	}
295 
296 	bio_set_dev(vio->bio, vdo_get_backing_device(vdo));
297 	submit_bio_wait(vio->bio);
298 	result = blk_status_to_errno(vio->bio->bi_status);
299 	free_vio(vdo_forget(vio));
300 	if (result != 0) {
301 		vdo_log_error_strerror(result, "synchronous read failed");
302 		vdo_free(block);
303 		return -EIO;
304 	}
305 
306 	result = vdo_parse_geometry_block((u8 *) block, &vdo->geometry);
307 	vdo_free(block);
308 	return result;
309 }
310 
get_zone_thread_name(const thread_id_t thread_ids[],zone_count_t count,thread_id_t id,const char * prefix,char * buffer,size_t buffer_length)311 static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count,
312 				 thread_id_t id, const char *prefix,
313 				 char *buffer, size_t buffer_length)
314 {
315 	if (id >= thread_ids[0]) {
316 		thread_id_t index = id - thread_ids[0];
317 
318 		if (index < count) {
319 			snprintf(buffer, buffer_length, "%s%d", prefix, index);
320 			return true;
321 		}
322 	}
323 
324 	return false;
325 }
326 
327 /**
328  * get_thread_name() - Format the name of the worker thread desired to support a given work queue.
329  * @thread_config: The thread configuration.
330  * @thread_id: The thread id.
331  * @buffer: Where to put the formatted name.
332  * @buffer_length: Size of the output buffer.
333  *
334  * The physical layer may add a prefix identifying the product; the output from this function
335  * should just identify the thread.
336  */
get_thread_name(const struct thread_config * thread_config,thread_id_t thread_id,char * buffer,size_t buffer_length)337 static void get_thread_name(const struct thread_config *thread_config,
338 			    thread_id_t thread_id, char *buffer, size_t buffer_length)
339 {
340 	if (thread_id == thread_config->journal_thread) {
341 		if (thread_config->packer_thread == thread_id) {
342 			/*
343 			 * This is the "single thread" config where one thread is used for the
344 			 * journal, packer, logical, physical, and hash zones. In that case, it is
345 			 * known as the "request queue."
346 			 */
347 			snprintf(buffer, buffer_length, "reqQ");
348 			return;
349 		}
350 
351 		snprintf(buffer, buffer_length, "journalQ");
352 		return;
353 	} else if (thread_id == thread_config->admin_thread) {
354 		/* Theoretically this could be different from the journal thread. */
355 		snprintf(buffer, buffer_length, "adminQ");
356 		return;
357 	} else if (thread_id == thread_config->packer_thread) {
358 		snprintf(buffer, buffer_length, "packerQ");
359 		return;
360 	} else if (thread_id == thread_config->dedupe_thread) {
361 		snprintf(buffer, buffer_length, "dedupeQ");
362 		return;
363 	} else if (thread_id == thread_config->bio_ack_thread) {
364 		snprintf(buffer, buffer_length, "ackQ");
365 		return;
366 	} else if (thread_id == thread_config->cpu_thread) {
367 		snprintf(buffer, buffer_length, "cpuQ");
368 		return;
369 	}
370 
371 	if (get_zone_thread_name(thread_config->logical_threads,
372 				 thread_config->logical_zone_count,
373 				 thread_id, "logQ", buffer, buffer_length))
374 		return;
375 
376 	if (get_zone_thread_name(thread_config->physical_threads,
377 				 thread_config->physical_zone_count,
378 				 thread_id, "physQ", buffer, buffer_length))
379 		return;
380 
381 	if (get_zone_thread_name(thread_config->hash_zone_threads,
382 				 thread_config->hash_zone_count,
383 				 thread_id, "hashQ", buffer, buffer_length))
384 		return;
385 
386 	if (get_zone_thread_name(thread_config->bio_threads,
387 				 thread_config->bio_thread_count,
388 				 thread_id, "bioQ", buffer, buffer_length))
389 		return;
390 
391 	/* Some sort of misconfiguration? */
392 	snprintf(buffer, buffer_length, "reqQ%d", thread_id);
393 }
394 
395 /**
396  * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for
397  *                     round-robin queues).
398  * @vdo: The vdo which owns the thread.
399  * @thread_id: The id of the thread to create (as determined by the thread_config).
400  * @type: The description of the work queue for this thread.
401  * @queue_count: The number of actual threads/queues contained in the "thread".
402  * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL.
403  *
404  * Each "thread" constructed by this method is represented by a unique thread id in the thread
405  * config, and completions can be enqueued to the queue and run on the threads comprising this
406  * entity.
407  *
408  * Return: VDO_SUCCESS or an error.
409  */
vdo_make_thread(struct vdo * vdo,thread_id_t thread_id,const struct vdo_work_queue_type * type,unsigned int queue_count,void * contexts[])410 int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id,
411 		    const struct vdo_work_queue_type *type,
412 		    unsigned int queue_count, void *contexts[])
413 {
414 	struct vdo_thread *thread = &vdo->threads[thread_id];
415 	char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN];
416 
417 	if (type == NULL)
418 		type = &default_queue_type;
419 
420 	if (thread->queue != NULL) {
421 		return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type),
422 				  "already constructed vdo thread %u is of the correct type",
423 				  thread_id);
424 	}
425 
426 	thread->vdo = vdo;
427 	thread->thread_id = thread_id;
428 	get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name));
429 	return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread,
430 				   type, queue_count, contexts, &thread->queue);
431 }
432 
433 /**
434  * register_vdo() - Register a VDO; it must not already be registered.
435  * @vdo: The vdo to register.
436  *
437  * Return: VDO_SUCCESS or an error.
438  */
register_vdo(struct vdo * vdo)439 static int register_vdo(struct vdo *vdo)
440 {
441 	int result;
442 
443 	write_lock(&registry.lock);
444 	result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL,
445 			    "VDO not already registered");
446 	if (result == VDO_SUCCESS) {
447 		INIT_LIST_HEAD(&vdo->registration);
448 		list_add_tail(&vdo->registration, &registry.links);
449 	}
450 	write_unlock(&registry.lock);
451 
452 	return result;
453 }
454 
455 /**
456  * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on
457  *                    error.
458  * @vdo: The vdo being initialized
459  * @config: The configuration of the vdo
460  * @instance: The instance number of the vdo
461  * @reason: The buffer to hold the failure reason on error
462  */
initialize_vdo(struct vdo * vdo,struct device_config * config,unsigned int instance,char ** reason)463 static int initialize_vdo(struct vdo *vdo, struct device_config *config,
464 			  unsigned int instance, char **reason)
465 {
466 	int result;
467 	zone_count_t i;
468 
469 	vdo->device_config = config;
470 	vdo->starting_sector_offset = config->owning_target->begin;
471 	vdo->instance = instance;
472 	vdo->allocations_allowed = true;
473 	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW);
474 	INIT_LIST_HEAD(&vdo->device_config_list);
475 	vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION);
476 	init_completion(&vdo->admin.callback_sync);
477 	mutex_init(&vdo->stats_mutex);
478 	result = read_geometry_block(vdo);
479 	if (result != VDO_SUCCESS) {
480 		*reason = "Could not load geometry block";
481 		return result;
482 	}
483 
484 	result = initialize_thread_config(config->thread_counts, &vdo->thread_config);
485 	if (result != VDO_SUCCESS) {
486 		*reason = "Cannot create thread configuration";
487 		return result;
488 	}
489 
490 	vdo_log_info("zones: %d logical, %d physical, %d hash; total threads: %d",
491 		     config->thread_counts.logical_zones,
492 		     config->thread_counts.physical_zones,
493 		     config->thread_counts.hash_zones, vdo->thread_config.thread_count);
494 
495 	/* Compression context storage */
496 	result = vdo_allocate(config->thread_counts.cpu_threads, char *, "LZ4 context",
497 			      &vdo->compression_context);
498 	if (result != VDO_SUCCESS) {
499 		*reason = "cannot allocate LZ4 context";
500 		return result;
501 	}
502 
503 	for (i = 0; i < config->thread_counts.cpu_threads; i++) {
504 		result = vdo_allocate(LZ4_MEM_COMPRESS, char, "LZ4 context",
505 				      &vdo->compression_context[i]);
506 		if (result != VDO_SUCCESS) {
507 			*reason = "cannot allocate LZ4 context";
508 			return result;
509 		}
510 	}
511 
512 	result = register_vdo(vdo);
513 	if (result != VDO_SUCCESS) {
514 		*reason = "Cannot add VDO to device registry";
515 		return result;
516 	}
517 
518 	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED);
519 	return result;
520 }
521 
522 /**
523  * vdo_make() - Allocate and initialize a vdo.
524  * @instance: Device instantiation counter.
525  * @config: The device configuration.
526  * @reason: The reason for any failure during this call.
527  * @vdo_ptr: A pointer to hold the created vdo.
528  *
529  * Return: VDO_SUCCESS or an error.
530  */
vdo_make(unsigned int instance,struct device_config * config,char ** reason,struct vdo ** vdo_ptr)531 int vdo_make(unsigned int instance, struct device_config *config, char **reason,
532 	     struct vdo **vdo_ptr)
533 {
534 	int result;
535 	struct vdo *vdo;
536 
537 	/* Initialize with a generic failure reason to prevent returning garbage. */
538 	*reason = "Unspecified error";
539 
540 	result = vdo_allocate(1, struct vdo, __func__, &vdo);
541 	if (result != VDO_SUCCESS) {
542 		*reason = "Cannot allocate VDO";
543 		return result;
544 	}
545 
546 	result = initialize_vdo(vdo, config, instance, reason);
547 	if (result != VDO_SUCCESS) {
548 		vdo_destroy(vdo);
549 		return result;
550 	}
551 
552 	/* From here on, the caller will clean up if there is an error. */
553 	*vdo_ptr = vdo;
554 
555 	snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix),
556 		 "vdo%u", instance);
557 	result = vdo_allocate(vdo->thread_config.thread_count,
558 			      struct vdo_thread, __func__, &vdo->threads);
559 	if (result != VDO_SUCCESS) {
560 		*reason = "Cannot allocate thread structures";
561 		return result;
562 	}
563 
564 	result = vdo_make_thread(vdo, vdo->thread_config.admin_thread,
565 				 &default_queue_type, 1, NULL);
566 	if (result != VDO_SUCCESS) {
567 		*reason = "Cannot make admin thread";
568 		return result;
569 	}
570 
571 	result = vdo_make_flusher(vdo);
572 	if (result != VDO_SUCCESS) {
573 		*reason = "Cannot make flusher zones";
574 		return result;
575 	}
576 
577 	result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer);
578 	if (result != VDO_SUCCESS) {
579 		*reason = "Cannot make packer zones";
580 		return result;
581 	}
582 
583 	BUG_ON(vdo->device_config->logical_block_size <= 0);
584 	BUG_ON(vdo->device_config->owned_device == NULL);
585 	result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS,
586 				    MAXIMUM_VDO_USER_VIOS * 3 / 4,
587 				    &vdo->data_vio_pool);
588 	if (result != VDO_SUCCESS) {
589 		*reason = "Cannot allocate data_vio pool";
590 		return result;
591 	}
592 
593 	result = vdo_make_io_submitter(config->thread_counts.bio_threads,
594 				       config->thread_counts.bio_rotation_interval,
595 				       get_data_vio_pool_request_limit(vdo->data_vio_pool),
596 				       vdo, &vdo->io_submitter);
597 	if (result != VDO_SUCCESS) {
598 		*reason = "bio submission initialization failed";
599 		return result;
600 	}
601 
602 	if (vdo_uses_bio_ack_queue(vdo)) {
603 		result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread,
604 					 &bio_ack_q_type,
605 					 config->thread_counts.bio_ack_threads, NULL);
606 		if (result != VDO_SUCCESS) {
607 			*reason = "bio ack queue initialization failed";
608 			return result;
609 		}
610 	}
611 
612 	result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type,
613 				 config->thread_counts.cpu_threads,
614 				 (void **) vdo->compression_context);
615 	if (result != VDO_SUCCESS) {
616 		*reason = "CPU queue initialization failed";
617 		return result;
618 	}
619 
620 	return VDO_SUCCESS;
621 }
622 
finish_vdo(struct vdo * vdo)623 static void finish_vdo(struct vdo *vdo)
624 {
625 	int i;
626 
627 	if (vdo->threads == NULL)
628 		return;
629 
630 	vdo_cleanup_io_submitter(vdo->io_submitter);
631 	vdo_finish_dedupe_index(vdo->hash_zones);
632 
633 	for (i = 0; i < vdo->thread_config.thread_count; i++)
634 		vdo_finish_work_queue(vdo->threads[i].queue);
635 }
636 
637 /**
638  * free_listeners() - Free the list of read-only listeners associated with a thread.
639  * @thread: The thread holding the list to free.
640  */
free_listeners(struct vdo_thread * thread)641 static void free_listeners(struct vdo_thread *thread)
642 {
643 	struct read_only_listener *listener, *next;
644 
645 	for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) {
646 		next = vdo_forget(listener->next);
647 		vdo_free(listener);
648 	}
649 }
650 
uninitialize_super_block(struct vdo_super_block * super_block)651 static void uninitialize_super_block(struct vdo_super_block *super_block)
652 {
653 	free_vio_components(&super_block->vio);
654 	vdo_free(super_block->buffer);
655 }
656 
657 /**
658  * unregister_vdo() - Remove a vdo from the device registry.
659  * @vdo: The vdo to remove.
660  */
unregister_vdo(struct vdo * vdo)661 static void unregister_vdo(struct vdo *vdo)
662 {
663 	write_lock(&registry.lock);
664 	if (filter_vdos_locked(vdo_is_equal, vdo) == vdo)
665 		list_del_init(&vdo->registration);
666 
667 	write_unlock(&registry.lock);
668 }
669 
670 /**
671  * vdo_destroy() - Destroy a vdo instance.
672  * @vdo: The vdo to destroy (may be NULL).
673  */
vdo_destroy(struct vdo * vdo)674 void vdo_destroy(struct vdo *vdo)
675 {
676 	unsigned int i;
677 
678 	if (vdo == NULL)
679 		return;
680 
681 	/* A running VDO should never be destroyed without suspending first. */
682 	BUG_ON(vdo_get_admin_state(vdo)->normal);
683 
684 	vdo->allocations_allowed = true;
685 
686 	finish_vdo(vdo);
687 	unregister_vdo(vdo);
688 	free_data_vio_pool(vdo->data_vio_pool);
689 	vdo_free_io_submitter(vdo_forget(vdo->io_submitter));
690 	vdo_free_flusher(vdo_forget(vdo->flusher));
691 	vdo_free_packer(vdo_forget(vdo->packer));
692 	vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal));
693 	vdo_free_slab_depot(vdo_forget(vdo->depot));
694 	vdo_uninitialize_layout(&vdo->layout);
695 	vdo_uninitialize_layout(&vdo->next_layout);
696 	if (vdo->partition_copier)
697 		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
698 	uninitialize_super_block(&vdo->super_block);
699 	vdo_free_block_map(vdo_forget(vdo->block_map));
700 	vdo_free_hash_zones(vdo_forget(vdo->hash_zones));
701 	vdo_free_physical_zones(vdo_forget(vdo->physical_zones));
702 	vdo_free_logical_zones(vdo_forget(vdo->logical_zones));
703 
704 	if (vdo->threads != NULL) {
705 		for (i = 0; i < vdo->thread_config.thread_count; i++) {
706 			free_listeners(&vdo->threads[i]);
707 			vdo_free_work_queue(vdo_forget(vdo->threads[i].queue));
708 		}
709 		vdo_free(vdo_forget(vdo->threads));
710 	}
711 
712 	uninitialize_thread_config(&vdo->thread_config);
713 
714 	if (vdo->compression_context != NULL) {
715 		for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++)
716 			vdo_free(vdo_forget(vdo->compression_context[i]));
717 
718 		vdo_free(vdo_forget(vdo->compression_context));
719 	}
720 	vdo_free(vdo);
721 }
722 
initialize_super_block(struct vdo * vdo,struct vdo_super_block * super_block)723 static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block)
724 {
725 	int result;
726 
727 	result = vdo_allocate(VDO_BLOCK_SIZE, char, "encoded super block",
728 			      (char **) &vdo->super_block.buffer);
729 	if (result != VDO_SUCCESS)
730 		return result;
731 
732 	return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK,
733 				       VIO_PRIORITY_METADATA, NULL, 1,
734 				       (char *) super_block->buffer,
735 				       &vdo->super_block.vio);
736 }
737 
738 /**
739  * finish_reading_super_block() - Continue after loading the super block.
740  * @completion: The super block vio.
741  *
742  * This callback is registered in vdo_load_super_block().
743  */
finish_reading_super_block(struct vdo_completion * completion)744 static void finish_reading_super_block(struct vdo_completion *completion)
745 {
746 	struct vdo_super_block *super_block =
747 		container_of(as_vio(completion), struct vdo_super_block, vio);
748 
749 	vdo_continue_completion(vdo_forget(completion->parent),
750 				vdo_decode_super_block(super_block->buffer));
751 }
752 
753 /**
754  * handle_super_block_read_error() - Handle an error reading the super block.
755  * @completion: The super block vio.
756  *
757  * This error handler is registered in vdo_load_super_block().
758  */
handle_super_block_read_error(struct vdo_completion * completion)759 static void handle_super_block_read_error(struct vdo_completion *completion)
760 {
761 	vio_record_metadata_io_error(as_vio(completion));
762 	finish_reading_super_block(completion);
763 }
764 
read_super_block_endio(struct bio * bio)765 static void read_super_block_endio(struct bio *bio)
766 {
767 	struct vio *vio = bio->bi_private;
768 	struct vdo_completion *parent = vio->completion.parent;
769 
770 	continue_vio_after_io(vio, finish_reading_super_block,
771 			      parent->callback_thread_id);
772 }
773 
774 /**
775  * vdo_load_super_block() - Allocate a super block and read its contents from storage.
776  * @vdo: The vdo containing the super block on disk.
777  * @parent: The completion to notify after loading the super block.
778  */
vdo_load_super_block(struct vdo * vdo,struct vdo_completion * parent)779 void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent)
780 {
781 	int result;
782 
783 	result = initialize_super_block(vdo, &vdo->super_block);
784 	if (result != VDO_SUCCESS) {
785 		vdo_continue_completion(parent, result);
786 		return;
787 	}
788 
789 	vdo->super_block.vio.completion.parent = parent;
790 	vdo_submit_metadata_vio(&vdo->super_block.vio,
791 				vdo_get_data_region_start(vdo->geometry),
792 				read_super_block_endio,
793 				handle_super_block_read_error,
794 				REQ_OP_READ);
795 }
796 
797 /**
798  * vdo_get_backing_device() - Get the block device object underlying a vdo.
799  * @vdo: The vdo.
800  *
801  * Return: The vdo's current block device.
802  */
vdo_get_backing_device(const struct vdo * vdo)803 struct block_device *vdo_get_backing_device(const struct vdo *vdo)
804 {
805 	return vdo->device_config->owned_device->bdev;
806 }
807 
808 /**
809  * vdo_get_device_name() - Get the device name associated with the vdo target.
810  * @target: The target device interface.
811  *
812  * Return: The block device name.
813  */
vdo_get_device_name(const struct dm_target * target)814 const char *vdo_get_device_name(const struct dm_target *target)
815 {
816 	return dm_device_name(dm_table_get_md(target->table));
817 }
818 
819 /**
820  * vdo_synchronous_flush() - Issue a flush request and wait for it to complete.
821  * @vdo: The vdo.
822  *
823  * Return: VDO_SUCCESS or an error.
824  */
vdo_synchronous_flush(struct vdo * vdo)825 int vdo_synchronous_flush(struct vdo *vdo)
826 {
827 	int result;
828 	struct bio bio;
829 
830 	bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0,
831 		 REQ_OP_WRITE | REQ_PREFLUSH);
832 	submit_bio_wait(&bio);
833 	result = blk_status_to_errno(bio.bi_status);
834 
835 	atomic64_inc(&vdo->stats.flush_out);
836 	if (result != 0) {
837 		vdo_log_error_strerror(result, "synchronous flush failed");
838 		result = -EIO;
839 	}
840 
841 	bio_uninit(&bio);
842 	return result;
843 }
844 
845 /**
846  * vdo_get_state() - Get the current state of the vdo.
847  * @vdo: The vdo.
848  *
849  * Context: This method may be called from any thread.
850  *
851  * Return: The current state of the vdo.
852  */
vdo_get_state(const struct vdo * vdo)853 enum vdo_state vdo_get_state(const struct vdo *vdo)
854 {
855 	enum vdo_state state = atomic_read(&vdo->state);
856 
857 	/* pairs with barriers where state field is changed */
858 	smp_rmb();
859 	return state;
860 }
861 
862 /**
863  * vdo_set_state() - Set the current state of the vdo.
864  * @vdo: The vdo whose state is to be set.
865  * @state: The new state of the vdo.
866  *
867  * Context: This method may be called from any thread.
868  */
vdo_set_state(struct vdo * vdo,enum vdo_state state)869 void vdo_set_state(struct vdo *vdo, enum vdo_state state)
870 {
871 	/* pairs with barrier in vdo_get_state */
872 	smp_wmb();
873 	atomic_set(&vdo->state, state);
874 }
875 
876 /**
877  * vdo_get_admin_state() - Get the admin state of the vdo.
878  * @vdo: The vdo.
879  *
880  * Return: The code for the vdo's current admin state.
881  */
vdo_get_admin_state(const struct vdo * vdo)882 const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo)
883 {
884 	return vdo_get_admin_state_code(&vdo->admin.state);
885 }
886 
887 /**
888  * record_vdo() - Record the state of the VDO for encoding in the super block.
889  * @vdo: The vdo.
890  */
record_vdo(struct vdo * vdo)891 static void record_vdo(struct vdo *vdo)
892 {
893 	/* This is for backwards compatibility. */
894 	vdo->states.unused = vdo->geometry.unused;
895 	vdo->states.vdo.state = vdo_get_state(vdo);
896 	vdo->states.block_map = vdo_record_block_map(vdo->block_map);
897 	vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal);
898 	vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot);
899 	vdo->states.layout = vdo->layout;
900 }
901 
902 /**
903  * continue_super_block_parent() - Continue the parent of a super block save operation.
904  * @completion: The super block vio.
905  *
906  * This callback is registered in vdo_save_components().
907  */
continue_super_block_parent(struct vdo_completion * completion)908 static void continue_super_block_parent(struct vdo_completion *completion)
909 {
910 	vdo_continue_completion(vdo_forget(completion->parent), completion->result);
911 }
912 
913 /**
914  * handle_save_error() - Log a super block save error.
915  * @completion: The super block vio.
916  *
917  * This error handler is registered in vdo_save_components().
918  */
handle_save_error(struct vdo_completion * completion)919 static void handle_save_error(struct vdo_completion *completion)
920 {
921 	struct vdo_super_block *super_block =
922 		container_of(as_vio(completion), struct vdo_super_block, vio);
923 
924 	vio_record_metadata_io_error(&super_block->vio);
925 	vdo_log_error_strerror(completion->result, "super block save failed");
926 	/*
927 	 * Mark the super block as unwritable so that we won't attempt to write it again. This
928 	 * avoids the case where a growth attempt fails writing the super block with the new size,
929 	 * but the subsequent attempt to write out the read-only state succeeds. In this case,
930 	 * writes which happened just before the suspend would not be visible if the VDO is
931 	 * restarted without rebuilding, but, after a read-only rebuild, the effects of those
932 	 * writes would reappear.
933 	 */
934 	super_block->unwritable = true;
935 	completion->callback(completion);
936 }
937 
super_block_write_endio(struct bio * bio)938 static void super_block_write_endio(struct bio *bio)
939 {
940 	struct vio *vio = bio->bi_private;
941 	struct vdo_completion *parent = vio->completion.parent;
942 
943 	continue_vio_after_io(vio, continue_super_block_parent,
944 			      parent->callback_thread_id);
945 }
946 
947 /**
948  * vdo_save_components() - Encode the vdo and save the super block asynchronously.
949  * @vdo: The vdo whose state is being saved.
950  * @parent: The completion to notify when the save is complete.
951  */
vdo_save_components(struct vdo * vdo,struct vdo_completion * parent)952 void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent)
953 {
954 	struct vdo_super_block *super_block = &vdo->super_block;
955 
956 	if (super_block->unwritable) {
957 		vdo_continue_completion(parent, VDO_READ_ONLY);
958 		return;
959 	}
960 
961 	if (super_block->vio.completion.parent != NULL) {
962 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
963 		return;
964 	}
965 
966 	record_vdo(vdo);
967 
968 	vdo_encode_super_block(super_block->buffer, &vdo->states);
969 	super_block->vio.completion.parent = parent;
970 	super_block->vio.completion.callback_thread_id = parent->callback_thread_id;
971 	vdo_submit_metadata_vio(&super_block->vio,
972 				vdo_get_data_region_start(vdo->geometry),
973 				super_block_write_endio, handle_save_error,
974 				REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA);
975 }
976 
977 /**
978  * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes
979  *                                     read-only.
980  * @vdo: The vdo to register with.
981  * @listener: The object to notify.
982  * @notification: The function to call to send the notification.
983  * @thread_id: The id of the thread on which to send the notification.
984  *
985  * Return: VDO_SUCCESS or an error.
986  */
vdo_register_read_only_listener(struct vdo * vdo,void * listener,vdo_read_only_notification_fn notification,thread_id_t thread_id)987 int vdo_register_read_only_listener(struct vdo *vdo, void *listener,
988 				    vdo_read_only_notification_fn notification,
989 				    thread_id_t thread_id)
990 {
991 	struct vdo_thread *thread = &vdo->threads[thread_id];
992 	struct read_only_listener *read_only_listener;
993 	int result;
994 
995 	result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread,
996 			    "read only listener not registered on dedupe thread");
997 	if (result != VDO_SUCCESS)
998 		return result;
999 
1000 	result = vdo_allocate(1, struct read_only_listener, __func__,
1001 			      &read_only_listener);
1002 	if (result != VDO_SUCCESS)
1003 		return result;
1004 
1005 	*read_only_listener = (struct read_only_listener) {
1006 		.listener = listener,
1007 		.notify = notification,
1008 		.next = thread->listeners,
1009 	};
1010 
1011 	thread->listeners = read_only_listener;
1012 	return VDO_SUCCESS;
1013 }
1014 
1015 /**
1016  * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only.
1017  * @listener: The vdo.
1018  * @parent: The completion to notify in order to acknowledge the notification.
1019  *
1020  * This will save the read-only state to the super block.
1021  *
1022  * Implements vdo_read_only_notification_fn.
1023  */
notify_vdo_of_read_only_mode(void * listener,struct vdo_completion * parent)1024 static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent)
1025 {
1026 	struct vdo *vdo = listener;
1027 
1028 	if (vdo_in_read_only_mode(vdo))
1029 		vdo_finish_completion(parent);
1030 
1031 	vdo_set_state(vdo, VDO_READ_ONLY_MODE);
1032 	vdo_save_components(vdo, parent);
1033 }
1034 
1035 /**
1036  * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors.
1037  * @vdo: The vdo to enable.
1038  *
1039  * Return: VDO_SUCCESS or an error.
1040  */
vdo_enable_read_only_entry(struct vdo * vdo)1041 int vdo_enable_read_only_entry(struct vdo *vdo)
1042 {
1043 	thread_id_t id;
1044 	bool is_read_only = vdo_in_read_only_mode(vdo);
1045 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1046 
1047 	if (is_read_only) {
1048 		notifier->read_only_error = VDO_READ_ONLY;
1049 		notifier->state = NOTIFIED;
1050 	} else {
1051 		notifier->state = MAY_NOT_NOTIFY;
1052 	}
1053 
1054 	spin_lock_init(&notifier->lock);
1055 	vdo_initialize_completion(&notifier->completion, vdo,
1056 				  VDO_READ_ONLY_MODE_COMPLETION);
1057 
1058 	for (id = 0; id < vdo->thread_config.thread_count; id++)
1059 		vdo->threads[id].is_read_only = is_read_only;
1060 
1061 	return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode,
1062 					       vdo->thread_config.admin_thread);
1063 }
1064 
1065 /**
1066  * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in
1067  *                                                progress and prevent any subsequent
1068  *                                                notifications.
1069  * @parent: The completion to notify when no threads are entering read-only mode.
1070  *
1071  * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry().
1072  */
vdo_wait_until_not_entering_read_only_mode(struct vdo_completion * parent)1073 void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent)
1074 {
1075 	struct vdo *vdo = parent->vdo;
1076 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1077 
1078 	vdo_assert_on_admin_thread(vdo, __func__);
1079 
1080 	if (notifier->waiter != NULL) {
1081 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1082 		return;
1083 	}
1084 
1085 	spin_lock(&notifier->lock);
1086 	if (notifier->state == NOTIFYING)
1087 		notifier->waiter = parent;
1088 	else if (notifier->state == MAY_NOTIFY)
1089 		notifier->state = MAY_NOT_NOTIFY;
1090 	spin_unlock(&notifier->lock);
1091 
1092 	if (notifier->waiter == NULL) {
1093 		/*
1094 		 * A notification was not in progress, and now they are
1095 		 * disallowed.
1096 		 */
1097 		vdo_launch_completion(parent);
1098 		return;
1099 	}
1100 }
1101 
1102 /**
1103  * as_notifier() - Convert a generic vdo_completion to a read_only_notifier.
1104  * @completion: The completion to convert.
1105  *
1106  * Return: The completion as a read_only_notifier.
1107  */
as_notifier(struct vdo_completion * completion)1108 static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion)
1109 {
1110 	vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION);
1111 	return container_of(completion, struct read_only_notifier, completion);
1112 }
1113 
1114 /**
1115  * finish_entering_read_only_mode() - Complete the process of entering read only mode.
1116  * @completion: The read-only mode completion.
1117  */
finish_entering_read_only_mode(struct vdo_completion * completion)1118 static void finish_entering_read_only_mode(struct vdo_completion *completion)
1119 {
1120 	struct read_only_notifier *notifier = as_notifier(completion);
1121 
1122 	vdo_assert_on_admin_thread(completion->vdo, __func__);
1123 
1124 	spin_lock(&notifier->lock);
1125 	notifier->state = NOTIFIED;
1126 	spin_unlock(&notifier->lock);
1127 
1128 	if (notifier->waiter != NULL)
1129 		vdo_continue_completion(vdo_forget(notifier->waiter),
1130 					completion->result);
1131 }
1132 
1133 /**
1134  * make_thread_read_only() - Inform each thread that the VDO is in read-only mode.
1135  * @completion: The read-only mode completion.
1136  */
make_thread_read_only(struct vdo_completion * completion)1137 static void make_thread_read_only(struct vdo_completion *completion)
1138 {
1139 	struct vdo *vdo = completion->vdo;
1140 	thread_id_t thread_id = completion->callback_thread_id;
1141 	struct read_only_notifier *notifier = as_notifier(completion);
1142 	struct read_only_listener *listener = completion->parent;
1143 
1144 	if (listener == NULL) {
1145 		/* This is the first call on this thread */
1146 		struct vdo_thread *thread = &vdo->threads[thread_id];
1147 
1148 		thread->is_read_only = true;
1149 		listener = thread->listeners;
1150 		if (thread_id == 0)
1151 			vdo_log_error_strerror(READ_ONCE(notifier->read_only_error),
1152 					       "Unrecoverable error, entering read-only mode");
1153 	} else {
1154 		/* We've just finished notifying a listener */
1155 		listener = listener->next;
1156 	}
1157 
1158 	if (listener != NULL) {
1159 		/* We have a listener to notify */
1160 		vdo_prepare_completion(completion, make_thread_read_only,
1161 				       make_thread_read_only, thread_id,
1162 				       listener);
1163 		listener->notify(listener->listener, completion);
1164 		return;
1165 	}
1166 
1167 	/* We're done with this thread */
1168 	if (++thread_id == vdo->thread_config.dedupe_thread) {
1169 		/*
1170 		 * We don't want to notify the dedupe thread since it may be
1171 		 * blocked rebuilding the index.
1172 		 */
1173 		thread_id++;
1174 	}
1175 
1176 	if (thread_id >= vdo->thread_config.thread_count) {
1177 		/* There are no more threads */
1178 		vdo_prepare_completion(completion, finish_entering_read_only_mode,
1179 				       finish_entering_read_only_mode,
1180 				       vdo->thread_config.admin_thread, NULL);
1181 	} else {
1182 		vdo_prepare_completion(completion, make_thread_read_only,
1183 				       make_thread_read_only, thread_id, NULL);
1184 	}
1185 
1186 	vdo_launch_completion(completion);
1187 }
1188 
1189 /**
1190  * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode,
1191  *                                    reversing the effects of
1192  *                                    vdo_wait_until_not_entering_read_only_mode().
1193  * @parent: The object to notify once the operation is complete.
1194  *
1195  * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it
1196  * will be done when this method is called. If that happens, the parent will not be notified until
1197  * the vdo has actually entered read-only mode and attempted to save the super block.
1198  *
1199  * Context: This method may only be called from the admin thread.
1200  */
vdo_allow_read_only_mode_entry(struct vdo_completion * parent)1201 void vdo_allow_read_only_mode_entry(struct vdo_completion *parent)
1202 {
1203 	struct vdo *vdo = parent->vdo;
1204 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1205 
1206 	vdo_assert_on_admin_thread(vdo, __func__);
1207 
1208 	if (notifier->waiter != NULL) {
1209 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1210 		return;
1211 	}
1212 
1213 	spin_lock(&notifier->lock);
1214 	if (notifier->state == MAY_NOT_NOTIFY) {
1215 		if (notifier->read_only_error == VDO_SUCCESS) {
1216 			notifier->state = MAY_NOTIFY;
1217 		} else {
1218 			notifier->state = NOTIFYING;
1219 			notifier->waiter = parent;
1220 		}
1221 	}
1222 	spin_unlock(&notifier->lock);
1223 
1224 	if (notifier->waiter == NULL) {
1225 		/* We're done */
1226 		vdo_launch_completion(parent);
1227 		return;
1228 	}
1229 
1230 	/* Do the pending notification. */
1231 	make_thread_read_only(&notifier->completion);
1232 }
1233 
1234 /**
1235  * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the
1236  *                              super block.
1237  * @vdo: The vdo.
1238  * @error_code: The error which caused the VDO to enter read-only mode.
1239  *
1240  * This method is a no-op if the VDO is already read-only.
1241  */
vdo_enter_read_only_mode(struct vdo * vdo,int error_code)1242 void vdo_enter_read_only_mode(struct vdo *vdo, int error_code)
1243 {
1244 	bool notify = false;
1245 	thread_id_t thread_id = vdo_get_callback_thread_id();
1246 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1247 	struct vdo_thread *thread;
1248 
1249 	if (thread_id != VDO_INVALID_THREAD_ID) {
1250 		thread = &vdo->threads[thread_id];
1251 		if (thread->is_read_only) {
1252 			/* This thread has already gone read-only. */
1253 			return;
1254 		}
1255 
1256 		/* Record for this thread that the VDO is read-only. */
1257 		thread->is_read_only = true;
1258 	}
1259 
1260 	spin_lock(&notifier->lock);
1261 	if (notifier->read_only_error == VDO_SUCCESS) {
1262 		WRITE_ONCE(notifier->read_only_error, error_code);
1263 		if (notifier->state == MAY_NOTIFY) {
1264 			notifier->state = NOTIFYING;
1265 			notify = true;
1266 		}
1267 	}
1268 	spin_unlock(&notifier->lock);
1269 
1270 	if (!notify) {
1271 		/* The notifier is already aware of a read-only error */
1272 		return;
1273 	}
1274 
1275 	/* Initiate a notification starting on the lowest numbered thread. */
1276 	vdo_launch_completion_callback(&notifier->completion, make_thread_read_only, 0);
1277 }
1278 
1279 /**
1280  * vdo_is_read_only() - Check whether the VDO is read-only.
1281  * @vdo: The vdo.
1282  *
1283  * Return: True if the vdo is read-only.
1284  *
1285  * This method may be called from any thread, as opposed to examining the VDO's state field which
1286  * is only safe to check from the admin thread.
1287  */
vdo_is_read_only(struct vdo * vdo)1288 bool vdo_is_read_only(struct vdo *vdo)
1289 {
1290 	return vdo->threads[vdo_get_callback_thread_id()].is_read_only;
1291 }
1292 
1293 /**
1294  * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode.
1295  * @vdo: The vdo to query.
1296  *
1297  * Return: True if the vdo is in read-only mode.
1298  */
vdo_in_read_only_mode(const struct vdo * vdo)1299 bool vdo_in_read_only_mode(const struct vdo *vdo)
1300 {
1301 	return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE);
1302 }
1303 
1304 /**
1305  * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode.
1306  * @vdo: The vdo to query.
1307  *
1308  * Return: True if the vdo is in recovery mode.
1309  */
vdo_in_recovery_mode(const struct vdo * vdo)1310 bool vdo_in_recovery_mode(const struct vdo *vdo)
1311 {
1312 	return (vdo_get_state(vdo) == VDO_RECOVERING);
1313 }
1314 
1315 /**
1316  * vdo_enter_recovery_mode() - Put the vdo into recovery mode.
1317  * @vdo: The vdo.
1318  */
vdo_enter_recovery_mode(struct vdo * vdo)1319 void vdo_enter_recovery_mode(struct vdo *vdo)
1320 {
1321 	vdo_assert_on_admin_thread(vdo, __func__);
1322 
1323 	if (vdo_in_read_only_mode(vdo))
1324 		return;
1325 
1326 	vdo_log_info("Entering recovery mode");
1327 	vdo_set_state(vdo, VDO_RECOVERING);
1328 }
1329 
1330 /**
1331  * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete.
1332  * @completion: The sync completion.
1333  */
complete_synchronous_action(struct vdo_completion * completion)1334 static void complete_synchronous_action(struct vdo_completion *completion)
1335 {
1336 	vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION);
1337 	complete(&(container_of(completion, struct sync_completion,
1338 				vdo_completion)->completion));
1339 }
1340 
1341 /**
1342  * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete.
1343  * @vdo: The vdo.
1344  * @action: The callback to launch.
1345  * @thread_id: The thread on which to run the action.
1346  * @parent: The parent of the sync completion (may be NULL).
1347  */
perform_synchronous_action(struct vdo * vdo,vdo_action_fn action,thread_id_t thread_id,void * parent)1348 static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action,
1349 				      thread_id_t thread_id, void *parent)
1350 {
1351 	struct sync_completion sync;
1352 
1353 	vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION);
1354 	init_completion(&sync.completion);
1355 	sync.vdo_completion.parent = parent;
1356 	vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id);
1357 	wait_for_completion(&sync.completion);
1358 	return sync.vdo_completion.result;
1359 }
1360 
1361 /**
1362  * set_compression_callback() - Callback to turn compression on or off.
1363  * @completion: The completion.
1364  */
set_compression_callback(struct vdo_completion * completion)1365 static void set_compression_callback(struct vdo_completion *completion)
1366 {
1367 	struct vdo *vdo = completion->vdo;
1368 	bool *enable = completion->parent;
1369 	bool was_enabled = vdo_get_compressing(vdo);
1370 
1371 	if (*enable != was_enabled) {
1372 		WRITE_ONCE(vdo->compressing, *enable);
1373 		if (was_enabled) {
1374 			/* Signal the packer to flush since compression has been disabled. */
1375 			vdo_flush_packer(vdo->packer);
1376 		}
1377 	}
1378 
1379 	vdo_log_info("compression is %s", (*enable ? "enabled" : "disabled"));
1380 	*enable = was_enabled;
1381 	complete_synchronous_action(completion);
1382 }
1383 
1384 /**
1385  * vdo_set_compressing() - Turn compression on or off.
1386  * @vdo: The vdo.
1387  * @enable: Whether to enable or disable compression.
1388  *
1389  * Return: Whether compression was previously on or off.
1390  */
vdo_set_compressing(struct vdo * vdo,bool enable)1391 bool vdo_set_compressing(struct vdo *vdo, bool enable)
1392 {
1393 	perform_synchronous_action(vdo, set_compression_callback,
1394 				   vdo->thread_config.packer_thread,
1395 				   &enable);
1396 	return enable;
1397 }
1398 
1399 /**
1400  * vdo_get_compressing() - Get whether compression is enabled in a vdo.
1401  * @vdo: The vdo.
1402  *
1403  * Return: State of compression.
1404  */
vdo_get_compressing(struct vdo * vdo)1405 bool vdo_get_compressing(struct vdo *vdo)
1406 {
1407 	return READ_ONCE(vdo->compressing);
1408 }
1409 
get_block_map_cache_size(const struct vdo * vdo)1410 static size_t get_block_map_cache_size(const struct vdo *vdo)
1411 {
1412 	return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE;
1413 }
1414 
get_vdo_error_statistics(const struct vdo * vdo)1415 static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo)
1416 {
1417 	/*
1418 	 * The error counts can be incremented from arbitrary threads and so must be incremented
1419 	 * atomically, but they are just statistics with no semantics that could rely on memory
1420 	 * order, so unfenced reads are sufficient.
1421 	 */
1422 	const struct atomic_statistics *atoms = &vdo->stats;
1423 
1424 	return (struct error_statistics) {
1425 		.invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count),
1426 		.no_space_error_count = atomic64_read(&atoms->no_space_error_count),
1427 		.read_only_error_count = atomic64_read(&atoms->read_only_error_count),
1428 	};
1429 }
1430 
copy_bio_stat(struct bio_stats * b,const struct atomic_bio_stats * a)1431 static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a)
1432 {
1433 	b->read = atomic64_read(&a->read);
1434 	b->write = atomic64_read(&a->write);
1435 	b->discard = atomic64_read(&a->discard);
1436 	b->flush = atomic64_read(&a->flush);
1437 	b->empty_flush = atomic64_read(&a->empty_flush);
1438 	b->fua = atomic64_read(&a->fua);
1439 }
1440 
subtract_bio_stats(struct bio_stats minuend,struct bio_stats subtrahend)1441 static struct bio_stats subtract_bio_stats(struct bio_stats minuend,
1442 					   struct bio_stats subtrahend)
1443 {
1444 	return (struct bio_stats) {
1445 		.read = minuend.read - subtrahend.read,
1446 		.write = minuend.write - subtrahend.write,
1447 		.discard = minuend.discard - subtrahend.discard,
1448 		.flush = minuend.flush - subtrahend.flush,
1449 		.empty_flush = minuend.empty_flush - subtrahend.empty_flush,
1450 		.fua = minuend.fua - subtrahend.fua,
1451 	};
1452 }
1453 
1454 /**
1455  * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data.
1456  * @vdo: The vdo.
1457  *
1458  * Return: The number of blocks allocated for user data.
1459  */
vdo_get_physical_blocks_allocated(const struct vdo * vdo)1460 static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo)
1461 {
1462 	return (vdo_get_slab_depot_allocated_blocks(vdo->depot) -
1463 		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1464 }
1465 
1466 /**
1467  * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata.
1468  * @vdo: The vdo.
1469  *
1470  * Return: The number of overhead blocks.
1471  */
vdo_get_physical_blocks_overhead(const struct vdo * vdo)1472 static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo)
1473 {
1474 	/*
1475 	 * config.physical_blocks is mutated during resize and is in a packed structure,
1476 	 * but resize runs on admin thread.
1477 	 * TODO: Verify that this is always safe.
1478 	 */
1479 	return (vdo->states.vdo.config.physical_blocks -
1480 		vdo_get_slab_depot_data_blocks(vdo->depot) +
1481 		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1482 }
1483 
vdo_describe_state(enum vdo_state state)1484 static const char *vdo_describe_state(enum vdo_state state)
1485 {
1486 	/* These strings should all fit in the 15 chars of VDOStatistics.mode. */
1487 	switch (state) {
1488 	case VDO_RECOVERING:
1489 		return "recovering";
1490 
1491 	case VDO_READ_ONLY_MODE:
1492 		return "read-only";
1493 
1494 	default:
1495 		return "normal";
1496 	}
1497 }
1498 
1499 /**
1500  * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread.
1501  * @vdo: The vdo.
1502  * @stats: The statistics structure to populate.
1503  */
get_vdo_statistics(const struct vdo * vdo,struct vdo_statistics * stats)1504 static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats)
1505 {
1506 	struct recovery_journal *journal = vdo->recovery_journal;
1507 	enum vdo_state state = vdo_get_state(vdo);
1508 
1509 	vdo_assert_on_admin_thread(vdo, __func__);
1510 
1511 	/* start with a clean slate */
1512 	memset(stats, 0, sizeof(struct vdo_statistics));
1513 
1514 	/*
1515 	 * These are immutable properties of the vdo object, so it is safe to query them from any
1516 	 * thread.
1517 	 */
1518 	stats->version = STATISTICS_VERSION;
1519 	stats->logical_blocks = vdo->states.vdo.config.logical_blocks;
1520 	/*
1521 	 * config.physical_blocks is mutated during resize and is in a packed structure, but resize
1522 	 * runs on the admin thread.
1523 	 * TODO: verify that this is always safe
1524 	 */
1525 	stats->physical_blocks = vdo->states.vdo.config.physical_blocks;
1526 	stats->block_size = VDO_BLOCK_SIZE;
1527 	stats->complete_recoveries = vdo->states.vdo.complete_recoveries;
1528 	stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries;
1529 	stats->block_map_cache_size = get_block_map_cache_size(vdo);
1530 
1531 	/* The callees are responsible for thread-safety. */
1532 	stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo);
1533 	stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo);
1534 	stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal);
1535 	vdo_get_slab_depot_statistics(vdo->depot, stats);
1536 	stats->journal = vdo_get_recovery_journal_statistics(journal);
1537 	stats->packer = vdo_get_packer_statistics(vdo->packer);
1538 	stats->block_map = vdo_get_block_map_statistics(vdo->block_map);
1539 	vdo_get_dedupe_statistics(vdo->hash_zones, stats);
1540 	stats->errors = get_vdo_error_statistics(vdo);
1541 	stats->in_recovery_mode = (state == VDO_RECOVERING);
1542 	snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state));
1543 
1544 	stats->instance = vdo->instance;
1545 	stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool);
1546 	stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool);
1547 
1548 	stats->flush_out = atomic64_read(&vdo->stats.flush_out);
1549 	stats->logical_block_size = vdo->device_config->logical_block_size;
1550 	copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in);
1551 	copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial);
1552 	copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out);
1553 	copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta);
1554 	copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal);
1555 	copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache);
1556 	copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed);
1557 	copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed);
1558 	copy_bio_stat(&stats->bios_journal_completed,
1559 		      &vdo->stats.bios_journal_completed);
1560 	copy_bio_stat(&stats->bios_page_cache_completed,
1561 		      &vdo->stats.bios_page_cache_completed);
1562 	copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged);
1563 	copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial);
1564 	stats->bios_in_progress =
1565 		subtract_bio_stats(stats->bios_in, stats->bios_acknowledged);
1566 	vdo_get_memory_stats(&stats->memory_usage.bytes_used,
1567 			     &stats->memory_usage.peak_bytes_used);
1568 }
1569 
1570 /**
1571  * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics
1572  *                                   structure on the admin thread.
1573  * @completion: The completion.
1574  *
1575  * This callback is registered in vdo_fetch_statistics().
1576  */
vdo_fetch_statistics_callback(struct vdo_completion * completion)1577 static void vdo_fetch_statistics_callback(struct vdo_completion *completion)
1578 {
1579 	get_vdo_statistics(completion->vdo, completion->parent);
1580 	complete_synchronous_action(completion);
1581 }
1582 
1583 /**
1584  * vdo_fetch_statistics() - Fetch statistics on the correct thread.
1585  * @vdo: The vdo.
1586  * @stats: The vdo statistics are returned here.
1587  */
vdo_fetch_statistics(struct vdo * vdo,struct vdo_statistics * stats)1588 void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats)
1589 {
1590 	perform_synchronous_action(vdo, vdo_fetch_statistics_callback,
1591 				   vdo->thread_config.admin_thread, stats);
1592 }
1593 
1594 /**
1595  * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is
1596  *                                currently running.
1597  *
1598  * Return: The current thread ID, or -1 if no such thread.
1599  */
vdo_get_callback_thread_id(void)1600 thread_id_t vdo_get_callback_thread_id(void)
1601 {
1602 	struct vdo_work_queue *queue = vdo_get_current_work_queue();
1603 	struct vdo_thread *thread;
1604 	thread_id_t thread_id;
1605 
1606 	if (queue == NULL)
1607 		return VDO_INVALID_THREAD_ID;
1608 
1609 	thread = vdo_get_work_queue_owner(queue);
1610 	thread_id = thread->thread_id;
1611 
1612 	if (PARANOID_THREAD_CONSISTENCY_CHECKS) {
1613 		BUG_ON(thread_id >= thread->vdo->thread_config.thread_count);
1614 		BUG_ON(thread != &thread->vdo->threads[thread_id]);
1615 	}
1616 
1617 	return thread_id;
1618 }
1619 
1620 /**
1621  * vdo_dump_status() - Dump status information about a vdo to the log for debugging.
1622  * @vdo: The vdo to dump.
1623  */
vdo_dump_status(const struct vdo * vdo)1624 void vdo_dump_status(const struct vdo *vdo)
1625 {
1626 	zone_count_t zone;
1627 
1628 	vdo_dump_flusher(vdo->flusher);
1629 	vdo_dump_recovery_journal_statistics(vdo->recovery_journal);
1630 	vdo_dump_packer(vdo->packer);
1631 	vdo_dump_slab_depot(vdo->depot);
1632 
1633 	for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++)
1634 		vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]);
1635 
1636 	for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++)
1637 		vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]);
1638 
1639 	vdo_dump_hash_zones(vdo->hash_zones);
1640 }
1641 
1642 /**
1643  * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread.
1644  * @vdo: The vdo.
1645  * @name: The name of the function which should be running on the admin thread (for logging).
1646  */
vdo_assert_on_admin_thread(const struct vdo * vdo,const char * name)1647 void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name)
1648 {
1649 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread),
1650 			    "%s called on admin thread", name);
1651 }
1652 
1653 /**
1654  * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified
1655  *                                       logical zone thread.
1656  * @vdo: The vdo.
1657  * @logical_zone: The number of the logical zone.
1658  * @name: The name of the calling function.
1659  */
vdo_assert_on_logical_zone_thread(const struct vdo * vdo,zone_count_t logical_zone,const char * name)1660 void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone,
1661 				       const char *name)
1662 {
1663 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1664 			     vdo->thread_config.logical_threads[logical_zone]),
1665 			    "%s called on logical thread", name);
1666 }
1667 
1668 /**
1669  * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified
1670  *                                        physical zone thread.
1671  * @vdo: The vdo.
1672  * @physical_zone: The number of the physical zone.
1673  * @name: The name of the calling function.
1674  */
vdo_assert_on_physical_zone_thread(const struct vdo * vdo,zone_count_t physical_zone,const char * name)1675 void vdo_assert_on_physical_zone_thread(const struct vdo *vdo,
1676 					zone_count_t physical_zone, const char *name)
1677 {
1678 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1679 			     vdo->thread_config.physical_threads[physical_zone]),
1680 			    "%s called on physical thread", name);
1681 }
1682 
1683 /**
1684  * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number.
1685  * @vdo: The vdo containing the physical zones.
1686  * @pbn: The PBN of the data block.
1687  * @zone_ptr: A pointer to return the physical zone.
1688  *
1689  * Gets the physical zone responsible for a given physical block number of a data block in this vdo
1690  * instance, or of the zero block (for which a NULL zone is returned). For any other block number
1691  * that is not in the range of valid data block numbers in any slab, an error will be returned.
1692  * This function is safe to call on invalid block numbers; it will not put the vdo into read-only
1693  * mode.
1694  *
1695  * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any
1696  *         other failure.
1697  */
vdo_get_physical_zone(const struct vdo * vdo,physical_block_number_t pbn,struct physical_zone ** zone_ptr)1698 int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn,
1699 			  struct physical_zone **zone_ptr)
1700 {
1701 	struct vdo_slab *slab;
1702 	int result;
1703 
1704 	if (pbn == VDO_ZERO_BLOCK) {
1705 		*zone_ptr = NULL;
1706 		return VDO_SUCCESS;
1707 	}
1708 
1709 	/*
1710 	 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first
1711 	 * because it won't trigger read-only mode on an invalid PBN.
1712 	 */
1713 	if (!vdo_is_physical_data_block(vdo->depot, pbn))
1714 		return VDO_OUT_OF_RANGE;
1715 
1716 	/* With the PBN already checked, we should always succeed in finding a slab. */
1717 	slab = vdo_get_slab(vdo->depot, pbn);
1718 	result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs");
1719 	if (result != VDO_SUCCESS)
1720 		return result;
1721 
1722 	*zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number];
1723 	return VDO_SUCCESS;
1724 }
1725