1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 /*
7 * This file contains the main entry points for normal operations on a vdo as well as functions for
8 * constructing and destroying vdo instances (in memory).
9 */
10
11 /**
12 * DOC:
13 *
14 * A read_only_notifier has a single completion which is used to perform read-only notifications,
15 * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected
16 * by a spinlock, are used to control the read-only mode entry process. The first field holds the
17 * read-only error. The second is the state field, which may hold any of the four special values
18 * enumerated here.
19 *
20 * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field
21 * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already
22 * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in
23 * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is
24 * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then
25 * notifications are currently disallowed, generally due to the vdo being suspended. In this case,
26 * the nothing more will be done until the vdo is resumed, at which point the notification will be
27 * performed. In any other case, the vdo is already read-only, and there is nothing more to do.
28 */
29
30 #include "vdo.h"
31
32 #include <linux/completion.h>
33 #include <linux/device-mapper.h>
34 #include <linux/lz4.h>
35 #include <linux/mutex.h>
36 #include <linux/spinlock.h>
37 #include <linux/types.h>
38
39 #include "logger.h"
40 #include "memory-alloc.h"
41 #include "permassert.h"
42 #include "string-utils.h"
43
44 #include "block-map.h"
45 #include "completion.h"
46 #include "data-vio.h"
47 #include "dedupe.h"
48 #include "encodings.h"
49 #include "funnel-workqueue.h"
50 #include "io-submitter.h"
51 #include "logical-zone.h"
52 #include "packer.h"
53 #include "physical-zone.h"
54 #include "recovery-journal.h"
55 #include "slab-depot.h"
56 #include "statistics.h"
57 #include "status-codes.h"
58 #include "vio.h"
59
60 #define PARANOID_THREAD_CONSISTENCY_CHECKS 0
61
62 struct sync_completion {
63 struct vdo_completion vdo_completion;
64 struct completion completion;
65 };
66
67 /* A linked list is adequate for the small number of entries we expect. */
68 struct device_registry {
69 struct list_head links;
70 /* TODO: Convert to rcu per kernel recommendation. */
71 rwlock_t lock;
72 };
73
74 static struct device_registry registry;
75
76 /**
77 * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device
78 * registry.
79 */
vdo_initialize_device_registry_once(void)80 void vdo_initialize_device_registry_once(void)
81 {
82 INIT_LIST_HEAD(®istry.links);
83 rwlock_init(®istry.lock);
84 }
85
86 /** vdo_is_equal() - Implements vdo_filter_fn. */
vdo_is_equal(struct vdo * vdo,const void * context)87 static bool vdo_is_equal(struct vdo *vdo, const void *context)
88 {
89 return (vdo == context);
90 }
91
92 /**
93 * filter_vdos_locked() - Find a vdo in the registry if it exists there.
94 * @filter: The filter function to apply to devices.
95 * @context: A bit of context to provide the filter.
96 *
97 * Context: Must be called holding the lock.
98 *
99 * Return: the vdo object found, if any.
100 */
filter_vdos_locked(vdo_filter_fn filter,const void * context)101 static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter,
102 const void *context)
103 {
104 struct vdo *vdo;
105
106 list_for_each_entry(vdo, ®istry.links, registration) {
107 if (filter(vdo, context))
108 return vdo;
109 }
110
111 return NULL;
112 }
113
114 /**
115 * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function.
116 * @filter: The filter function to apply to vdos.
117 * @context: A bit of context to provide the filter.
118 */
vdo_find_matching(vdo_filter_fn filter,const void * context)119 struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context)
120 {
121 struct vdo *vdo;
122
123 read_lock(®istry.lock);
124 vdo = filter_vdos_locked(filter, context);
125 read_unlock(®istry.lock);
126
127 return vdo;
128 }
129
start_vdo_request_queue(void * ptr)130 static void start_vdo_request_queue(void *ptr)
131 {
132 struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
133
134 vdo_register_allocating_thread(&thread->allocating_thread,
135 &thread->vdo->allocations_allowed);
136 }
137
finish_vdo_request_queue(void * ptr)138 static void finish_vdo_request_queue(void *ptr)
139 {
140 vdo_unregister_allocating_thread();
141 }
142
143 static const struct vdo_work_queue_type default_queue_type = {
144 .start = start_vdo_request_queue,
145 .finish = finish_vdo_request_queue,
146 .max_priority = VDO_DEFAULT_Q_MAX_PRIORITY,
147 .default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY,
148 };
149
150 static const struct vdo_work_queue_type bio_ack_q_type = {
151 .start = NULL,
152 .finish = NULL,
153 .max_priority = BIO_ACK_Q_MAX_PRIORITY,
154 .default_priority = BIO_ACK_Q_ACK_PRIORITY,
155 };
156
157 static const struct vdo_work_queue_type cpu_q_type = {
158 .start = NULL,
159 .finish = NULL,
160 .max_priority = CPU_Q_MAX_PRIORITY,
161 .default_priority = CPU_Q_MAX_PRIORITY,
162 };
163
uninitialize_thread_config(struct thread_config * config)164 static void uninitialize_thread_config(struct thread_config *config)
165 {
166 vdo_free(vdo_forget(config->logical_threads));
167 vdo_free(vdo_forget(config->physical_threads));
168 vdo_free(vdo_forget(config->hash_zone_threads));
169 vdo_free(vdo_forget(config->bio_threads));
170 memset(config, 0, sizeof(struct thread_config));
171 }
172
assign_thread_ids(struct thread_config * config,thread_id_t thread_ids[],zone_count_t count)173 static void assign_thread_ids(struct thread_config *config,
174 thread_id_t thread_ids[], zone_count_t count)
175 {
176 zone_count_t zone;
177
178 for (zone = 0; zone < count; zone++)
179 thread_ids[zone] = config->thread_count++;
180 }
181
182 /**
183 * initialize_thread_config() - Initialize the thread mapping
184 *
185 * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all
186 * three plus the packer and recovery journal. Otherwise, there must be at least one of each type,
187 * and each will have its own thread, as will the packer and recovery journal.
188 *
189 * Return: VDO_SUCCESS or an error.
190 */
initialize_thread_config(struct thread_count_config counts,struct thread_config * config)191 static int __must_check initialize_thread_config(struct thread_count_config counts,
192 struct thread_config *config)
193 {
194 int result;
195 bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0);
196
197 config->bio_thread_count = counts.bio_threads;
198 if (single) {
199 config->logical_zone_count = 1;
200 config->physical_zone_count = 1;
201 config->hash_zone_count = 1;
202 } else {
203 config->logical_zone_count = counts.logical_zones;
204 config->physical_zone_count = counts.physical_zones;
205 config->hash_zone_count = counts.hash_zones;
206 }
207
208 result = vdo_allocate(config->logical_zone_count, thread_id_t,
209 "logical thread array", &config->logical_threads);
210 if (result != VDO_SUCCESS) {
211 uninitialize_thread_config(config);
212 return result;
213 }
214
215 result = vdo_allocate(config->physical_zone_count, thread_id_t,
216 "physical thread array", &config->physical_threads);
217 if (result != VDO_SUCCESS) {
218 uninitialize_thread_config(config);
219 return result;
220 }
221
222 result = vdo_allocate(config->hash_zone_count, thread_id_t,
223 "hash thread array", &config->hash_zone_threads);
224 if (result != VDO_SUCCESS) {
225 uninitialize_thread_config(config);
226 return result;
227 }
228
229 result = vdo_allocate(config->bio_thread_count, thread_id_t,
230 "bio thread array", &config->bio_threads);
231 if (result != VDO_SUCCESS) {
232 uninitialize_thread_config(config);
233 return result;
234 }
235
236 if (single) {
237 config->logical_threads[0] = config->thread_count;
238 config->physical_threads[0] = config->thread_count;
239 config->hash_zone_threads[0] = config->thread_count++;
240 } else {
241 config->admin_thread = config->thread_count;
242 config->journal_thread = config->thread_count++;
243 config->packer_thread = config->thread_count++;
244 assign_thread_ids(config, config->logical_threads, counts.logical_zones);
245 assign_thread_ids(config, config->physical_threads, counts.physical_zones);
246 assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones);
247 }
248
249 config->dedupe_thread = config->thread_count++;
250 config->bio_ack_thread =
251 ((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID);
252 config->cpu_thread = config->thread_count++;
253 assign_thread_ids(config, config->bio_threads, counts.bio_threads);
254 return VDO_SUCCESS;
255 }
256
257 /**
258 * read_geometry_block() - Synchronously read the geometry block from a vdo's underlying block
259 * device.
260 * @vdo: The vdo whose geometry is to be read.
261 *
262 * Return: VDO_SUCCESS or an error code.
263 */
read_geometry_block(struct vdo * vdo)264 static int __must_check read_geometry_block(struct vdo *vdo)
265 {
266 struct vio *vio;
267 char *block;
268 int result;
269
270 result = vdo_allocate(VDO_BLOCK_SIZE, u8, __func__, &block);
271 if (result != VDO_SUCCESS)
272 return result;
273
274 result = create_metadata_vio(vdo, VIO_TYPE_GEOMETRY, VIO_PRIORITY_HIGH, NULL,
275 block, &vio);
276 if (result != VDO_SUCCESS) {
277 vdo_free(block);
278 return result;
279 }
280
281 /*
282 * This is only safe because, having not already loaded the geometry, the vdo's geometry's
283 * bio_offset field is 0, so the fact that vio_reset_bio() will subtract that offset from
284 * the supplied pbn is not a problem.
285 */
286 result = vio_reset_bio(vio, block, NULL, REQ_OP_READ,
287 VDO_GEOMETRY_BLOCK_LOCATION);
288 if (result != VDO_SUCCESS) {
289 free_vio(vdo_forget(vio));
290 vdo_free(block);
291 return result;
292 }
293
294 bio_set_dev(vio->bio, vdo_get_backing_device(vdo));
295 submit_bio_wait(vio->bio);
296 result = blk_status_to_errno(vio->bio->bi_status);
297 free_vio(vdo_forget(vio));
298 if (result != 0) {
299 vdo_log_error_strerror(result, "synchronous read failed");
300 vdo_free(block);
301 return -EIO;
302 }
303
304 result = vdo_parse_geometry_block((u8 *) block, &vdo->geometry);
305 vdo_free(block);
306 return result;
307 }
308
get_zone_thread_name(const thread_id_t thread_ids[],zone_count_t count,thread_id_t id,const char * prefix,char * buffer,size_t buffer_length)309 static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count,
310 thread_id_t id, const char *prefix,
311 char *buffer, size_t buffer_length)
312 {
313 if (id >= thread_ids[0]) {
314 thread_id_t index = id - thread_ids[0];
315
316 if (index < count) {
317 snprintf(buffer, buffer_length, "%s%d", prefix, index);
318 return true;
319 }
320 }
321
322 return false;
323 }
324
325 /**
326 * get_thread_name() - Format the name of the worker thread desired to support a given work queue.
327 * @thread_config: The thread configuration.
328 * @thread_id: The thread id.
329 * @buffer: Where to put the formatted name.
330 * @buffer_length: Size of the output buffer.
331 *
332 * The physical layer may add a prefix identifying the product; the output from this function
333 * should just identify the thread.
334 */
get_thread_name(const struct thread_config * thread_config,thread_id_t thread_id,char * buffer,size_t buffer_length)335 static void get_thread_name(const struct thread_config *thread_config,
336 thread_id_t thread_id, char *buffer, size_t buffer_length)
337 {
338 if (thread_id == thread_config->journal_thread) {
339 if (thread_config->packer_thread == thread_id) {
340 /*
341 * This is the "single thread" config where one thread is used for the
342 * journal, packer, logical, physical, and hash zones. In that case, it is
343 * known as the "request queue."
344 */
345 snprintf(buffer, buffer_length, "reqQ");
346 return;
347 }
348
349 snprintf(buffer, buffer_length, "journalQ");
350 return;
351 } else if (thread_id == thread_config->admin_thread) {
352 /* Theoretically this could be different from the journal thread. */
353 snprintf(buffer, buffer_length, "adminQ");
354 return;
355 } else if (thread_id == thread_config->packer_thread) {
356 snprintf(buffer, buffer_length, "packerQ");
357 return;
358 } else if (thread_id == thread_config->dedupe_thread) {
359 snprintf(buffer, buffer_length, "dedupeQ");
360 return;
361 } else if (thread_id == thread_config->bio_ack_thread) {
362 snprintf(buffer, buffer_length, "ackQ");
363 return;
364 } else if (thread_id == thread_config->cpu_thread) {
365 snprintf(buffer, buffer_length, "cpuQ");
366 return;
367 }
368
369 if (get_zone_thread_name(thread_config->logical_threads,
370 thread_config->logical_zone_count,
371 thread_id, "logQ", buffer, buffer_length))
372 return;
373
374 if (get_zone_thread_name(thread_config->physical_threads,
375 thread_config->physical_zone_count,
376 thread_id, "physQ", buffer, buffer_length))
377 return;
378
379 if (get_zone_thread_name(thread_config->hash_zone_threads,
380 thread_config->hash_zone_count,
381 thread_id, "hashQ", buffer, buffer_length))
382 return;
383
384 if (get_zone_thread_name(thread_config->bio_threads,
385 thread_config->bio_thread_count,
386 thread_id, "bioQ", buffer, buffer_length))
387 return;
388
389 /* Some sort of misconfiguration? */
390 snprintf(buffer, buffer_length, "reqQ%d", thread_id);
391 }
392
393 /**
394 * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for
395 * round-robin queues).
396 * @vdo: The vdo which owns the thread.
397 * @thread_id: The id of the thread to create (as determined by the thread_config).
398 * @type: The description of the work queue for this thread.
399 * @queue_count: The number of actual threads/queues contained in the "thread".
400 * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL.
401 *
402 * Each "thread" constructed by this method is represented by a unique thread id in the thread
403 * config, and completions can be enqueued to the queue and run on the threads comprising this
404 * entity.
405 *
406 * Return: VDO_SUCCESS or an error.
407 */
vdo_make_thread(struct vdo * vdo,thread_id_t thread_id,const struct vdo_work_queue_type * type,unsigned int queue_count,void * contexts[])408 int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id,
409 const struct vdo_work_queue_type *type,
410 unsigned int queue_count, void *contexts[])
411 {
412 struct vdo_thread *thread = &vdo->threads[thread_id];
413 char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN];
414
415 if (type == NULL)
416 type = &default_queue_type;
417
418 if (thread->queue != NULL) {
419 return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type),
420 "already constructed vdo thread %u is of the correct type",
421 thread_id);
422 }
423
424 thread->vdo = vdo;
425 thread->thread_id = thread_id;
426 get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name));
427 return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread,
428 type, queue_count, contexts, &thread->queue);
429 }
430
431 /**
432 * register_vdo() - Register a VDO; it must not already be registered.
433 * @vdo: The vdo to register.
434 *
435 * Return: VDO_SUCCESS or an error.
436 */
register_vdo(struct vdo * vdo)437 static int register_vdo(struct vdo *vdo)
438 {
439 int result;
440
441 write_lock(®istry.lock);
442 result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL,
443 "VDO not already registered");
444 if (result == VDO_SUCCESS) {
445 INIT_LIST_HEAD(&vdo->registration);
446 list_add_tail(&vdo->registration, ®istry.links);
447 }
448 write_unlock(®istry.lock);
449
450 return result;
451 }
452
453 /**
454 * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on
455 * error.
456 * @vdo: The vdo being initialized
457 * @config: The configuration of the vdo
458 * @instance: The instance number of the vdo
459 * @reason: The buffer to hold the failure reason on error
460 */
initialize_vdo(struct vdo * vdo,struct device_config * config,unsigned int instance,char ** reason)461 static int initialize_vdo(struct vdo *vdo, struct device_config *config,
462 unsigned int instance, char **reason)
463 {
464 int result;
465 zone_count_t i;
466
467 vdo->device_config = config;
468 vdo->starting_sector_offset = config->owning_target->begin;
469 vdo->instance = instance;
470 vdo->allocations_allowed = true;
471 vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW);
472 INIT_LIST_HEAD(&vdo->device_config_list);
473 vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION);
474 init_completion(&vdo->admin.callback_sync);
475 mutex_init(&vdo->stats_mutex);
476 result = read_geometry_block(vdo);
477 if (result != VDO_SUCCESS) {
478 *reason = "Could not load geometry block";
479 return result;
480 }
481
482 result = initialize_thread_config(config->thread_counts, &vdo->thread_config);
483 if (result != VDO_SUCCESS) {
484 *reason = "Cannot create thread configuration";
485 return result;
486 }
487
488 vdo_log_info("zones: %d logical, %d physical, %d hash; total threads: %d",
489 config->thread_counts.logical_zones,
490 config->thread_counts.physical_zones,
491 config->thread_counts.hash_zones, vdo->thread_config.thread_count);
492
493 /* Compression context storage */
494 result = vdo_allocate(config->thread_counts.cpu_threads, char *, "LZ4 context",
495 &vdo->compression_context);
496 if (result != VDO_SUCCESS) {
497 *reason = "cannot allocate LZ4 context";
498 return result;
499 }
500
501 for (i = 0; i < config->thread_counts.cpu_threads; i++) {
502 result = vdo_allocate(LZ4_MEM_COMPRESS, char, "LZ4 context",
503 &vdo->compression_context[i]);
504 if (result != VDO_SUCCESS) {
505 *reason = "cannot allocate LZ4 context";
506 return result;
507 }
508 }
509
510 result = register_vdo(vdo);
511 if (result != VDO_SUCCESS) {
512 *reason = "Cannot add VDO to device registry";
513 return result;
514 }
515
516 vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED);
517 return result;
518 }
519
520 /**
521 * vdo_make() - Allocate and initialize a vdo.
522 * @instance: Device instantiation counter.
523 * @config: The device configuration.
524 * @reason: The reason for any failure during this call.
525 * @vdo_ptr: A pointer to hold the created vdo.
526 *
527 * Return: VDO_SUCCESS or an error.
528 */
vdo_make(unsigned int instance,struct device_config * config,char ** reason,struct vdo ** vdo_ptr)529 int vdo_make(unsigned int instance, struct device_config *config, char **reason,
530 struct vdo **vdo_ptr)
531 {
532 int result;
533 struct vdo *vdo;
534
535 /* Initialize with a generic failure reason to prevent returning garbage. */
536 *reason = "Unspecified error";
537
538 result = vdo_allocate(1, struct vdo, __func__, &vdo);
539 if (result != VDO_SUCCESS) {
540 *reason = "Cannot allocate VDO";
541 return result;
542 }
543
544 result = initialize_vdo(vdo, config, instance, reason);
545 if (result != VDO_SUCCESS) {
546 vdo_destroy(vdo);
547 return result;
548 }
549
550 /* From here on, the caller will clean up if there is an error. */
551 *vdo_ptr = vdo;
552
553 snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix),
554 "vdo%u", instance);
555 result = vdo_allocate(vdo->thread_config.thread_count,
556 struct vdo_thread, __func__, &vdo->threads);
557 if (result != VDO_SUCCESS) {
558 *reason = "Cannot allocate thread structures";
559 return result;
560 }
561
562 result = vdo_make_thread(vdo, vdo->thread_config.admin_thread,
563 &default_queue_type, 1, NULL);
564 if (result != VDO_SUCCESS) {
565 *reason = "Cannot make admin thread";
566 return result;
567 }
568
569 result = vdo_make_flusher(vdo);
570 if (result != VDO_SUCCESS) {
571 *reason = "Cannot make flusher zones";
572 return result;
573 }
574
575 result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer);
576 if (result != VDO_SUCCESS) {
577 *reason = "Cannot make packer zones";
578 return result;
579 }
580
581 BUG_ON(vdo->device_config->logical_block_size <= 0);
582 BUG_ON(vdo->device_config->owned_device == NULL);
583 result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS,
584 MAXIMUM_VDO_USER_VIOS * 3 / 4,
585 &vdo->data_vio_pool);
586 if (result != VDO_SUCCESS) {
587 *reason = "Cannot allocate data_vio pool";
588 return result;
589 }
590
591 result = vdo_make_io_submitter(config->thread_counts.bio_threads,
592 config->thread_counts.bio_rotation_interval,
593 get_data_vio_pool_request_limit(vdo->data_vio_pool),
594 vdo, &vdo->io_submitter);
595 if (result != VDO_SUCCESS) {
596 *reason = "bio submission initialization failed";
597 return result;
598 }
599
600 if (vdo_uses_bio_ack_queue(vdo)) {
601 result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread,
602 &bio_ack_q_type,
603 config->thread_counts.bio_ack_threads, NULL);
604 if (result != VDO_SUCCESS) {
605 *reason = "bio ack queue initialization failed";
606 return result;
607 }
608 }
609
610 result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type,
611 config->thread_counts.cpu_threads,
612 (void **) vdo->compression_context);
613 if (result != VDO_SUCCESS) {
614 *reason = "CPU queue initialization failed";
615 return result;
616 }
617
618 return VDO_SUCCESS;
619 }
620
finish_vdo(struct vdo * vdo)621 static void finish_vdo(struct vdo *vdo)
622 {
623 int i;
624
625 if (vdo->threads == NULL)
626 return;
627
628 vdo_cleanup_io_submitter(vdo->io_submitter);
629 vdo_finish_dedupe_index(vdo->hash_zones);
630
631 for (i = 0; i < vdo->thread_config.thread_count; i++)
632 vdo_finish_work_queue(vdo->threads[i].queue);
633 }
634
635 /**
636 * free_listeners() - Free the list of read-only listeners associated with a thread.
637 * @thread: The thread holding the list to free.
638 */
free_listeners(struct vdo_thread * thread)639 static void free_listeners(struct vdo_thread *thread)
640 {
641 struct read_only_listener *listener, *next;
642
643 for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) {
644 next = vdo_forget(listener->next);
645 vdo_free(listener);
646 }
647 }
648
uninitialize_super_block(struct vdo_super_block * super_block)649 static void uninitialize_super_block(struct vdo_super_block *super_block)
650 {
651 free_vio_components(&super_block->vio);
652 vdo_free(super_block->buffer);
653 }
654
655 /**
656 * unregister_vdo() - Remove a vdo from the device registry.
657 * @vdo: The vdo to remove.
658 */
unregister_vdo(struct vdo * vdo)659 static void unregister_vdo(struct vdo *vdo)
660 {
661 write_lock(®istry.lock);
662 if (filter_vdos_locked(vdo_is_equal, vdo) == vdo)
663 list_del_init(&vdo->registration);
664
665 write_unlock(®istry.lock);
666 }
667
668 /**
669 * vdo_destroy() - Destroy a vdo instance.
670 * @vdo: The vdo to destroy (may be NULL).
671 */
vdo_destroy(struct vdo * vdo)672 void vdo_destroy(struct vdo *vdo)
673 {
674 unsigned int i;
675
676 if (vdo == NULL)
677 return;
678
679 /* A running VDO should never be destroyed without suspending first. */
680 BUG_ON(vdo_get_admin_state(vdo)->normal);
681
682 vdo->allocations_allowed = true;
683
684 finish_vdo(vdo);
685 unregister_vdo(vdo);
686 free_data_vio_pool(vdo->data_vio_pool);
687 vdo_free_io_submitter(vdo_forget(vdo->io_submitter));
688 vdo_free_flusher(vdo_forget(vdo->flusher));
689 vdo_free_packer(vdo_forget(vdo->packer));
690 vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal));
691 vdo_free_slab_depot(vdo_forget(vdo->depot));
692 vdo_uninitialize_layout(&vdo->layout);
693 vdo_uninitialize_layout(&vdo->next_layout);
694 if (vdo->partition_copier)
695 dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
696 uninitialize_super_block(&vdo->super_block);
697 vdo_free_block_map(vdo_forget(vdo->block_map));
698 vdo_free_hash_zones(vdo_forget(vdo->hash_zones));
699 vdo_free_physical_zones(vdo_forget(vdo->physical_zones));
700 vdo_free_logical_zones(vdo_forget(vdo->logical_zones));
701
702 if (vdo->threads != NULL) {
703 for (i = 0; i < vdo->thread_config.thread_count; i++) {
704 free_listeners(&vdo->threads[i]);
705 vdo_free_work_queue(vdo_forget(vdo->threads[i].queue));
706 }
707 vdo_free(vdo_forget(vdo->threads));
708 }
709
710 uninitialize_thread_config(&vdo->thread_config);
711
712 if (vdo->compression_context != NULL) {
713 for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++)
714 vdo_free(vdo_forget(vdo->compression_context[i]));
715
716 vdo_free(vdo_forget(vdo->compression_context));
717 }
718 vdo_free(vdo);
719 }
720
initialize_super_block(struct vdo * vdo,struct vdo_super_block * super_block)721 static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block)
722 {
723 int result;
724
725 result = vdo_allocate(VDO_BLOCK_SIZE, char, "encoded super block",
726 (char **) &vdo->super_block.buffer);
727 if (result != VDO_SUCCESS)
728 return result;
729
730 return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK,
731 VIO_PRIORITY_METADATA, NULL, 1,
732 (char *) super_block->buffer,
733 &vdo->super_block.vio);
734 }
735
736 /**
737 * finish_reading_super_block() - Continue after loading the super block.
738 * @completion: The super block vio.
739 *
740 * This callback is registered in vdo_load_super_block().
741 */
finish_reading_super_block(struct vdo_completion * completion)742 static void finish_reading_super_block(struct vdo_completion *completion)
743 {
744 struct vdo_super_block *super_block =
745 container_of(as_vio(completion), struct vdo_super_block, vio);
746
747 vdo_continue_completion(vdo_forget(completion->parent),
748 vdo_decode_super_block(super_block->buffer));
749 }
750
751 /**
752 * handle_super_block_read_error() - Handle an error reading the super block.
753 * @completion: The super block vio.
754 *
755 * This error handler is registered in vdo_load_super_block().
756 */
handle_super_block_read_error(struct vdo_completion * completion)757 static void handle_super_block_read_error(struct vdo_completion *completion)
758 {
759 vio_record_metadata_io_error(as_vio(completion));
760 finish_reading_super_block(completion);
761 }
762
read_super_block_endio(struct bio * bio)763 static void read_super_block_endio(struct bio *bio)
764 {
765 struct vio *vio = bio->bi_private;
766 struct vdo_completion *parent = vio->completion.parent;
767
768 continue_vio_after_io(vio, finish_reading_super_block,
769 parent->callback_thread_id);
770 }
771
772 /**
773 * vdo_load_super_block() - Allocate a super block and read its contents from storage.
774 * @vdo: The vdo containing the super block on disk.
775 * @parent: The completion to notify after loading the super block.
776 */
vdo_load_super_block(struct vdo * vdo,struct vdo_completion * parent)777 void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent)
778 {
779 int result;
780
781 result = initialize_super_block(vdo, &vdo->super_block);
782 if (result != VDO_SUCCESS) {
783 vdo_continue_completion(parent, result);
784 return;
785 }
786
787 vdo->super_block.vio.completion.parent = parent;
788 vdo_submit_metadata_vio(&vdo->super_block.vio,
789 vdo_get_data_region_start(vdo->geometry),
790 read_super_block_endio,
791 handle_super_block_read_error,
792 REQ_OP_READ);
793 }
794
795 /**
796 * vdo_get_backing_device() - Get the block device object underlying a vdo.
797 * @vdo: The vdo.
798 *
799 * Return: The vdo's current block device.
800 */
vdo_get_backing_device(const struct vdo * vdo)801 struct block_device *vdo_get_backing_device(const struct vdo *vdo)
802 {
803 return vdo->device_config->owned_device->bdev;
804 }
805
806 /**
807 * vdo_get_device_name() - Get the device name associated with the vdo target.
808 * @target: The target device interface.
809 *
810 * Return: The block device name.
811 */
vdo_get_device_name(const struct dm_target * target)812 const char *vdo_get_device_name(const struct dm_target *target)
813 {
814 return dm_device_name(dm_table_get_md(target->table));
815 }
816
817 /**
818 * vdo_synchronous_flush() - Issue a flush request and wait for it to complete.
819 * @vdo: The vdo.
820 *
821 * Return: VDO_SUCCESS or an error.
822 */
vdo_synchronous_flush(struct vdo * vdo)823 int vdo_synchronous_flush(struct vdo *vdo)
824 {
825 int result;
826 struct bio bio;
827
828 bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0,
829 REQ_OP_WRITE | REQ_PREFLUSH);
830 submit_bio_wait(&bio);
831 result = blk_status_to_errno(bio.bi_status);
832
833 atomic64_inc(&vdo->stats.flush_out);
834 if (result != 0) {
835 vdo_log_error_strerror(result, "synchronous flush failed");
836 result = -EIO;
837 }
838
839 bio_uninit(&bio);
840 return result;
841 }
842
843 /**
844 * vdo_get_state() - Get the current state of the vdo.
845 * @vdo: The vdo.
846 *
847 * Context: This method may be called from any thread.
848 *
849 * Return: The current state of the vdo.
850 */
vdo_get_state(const struct vdo * vdo)851 enum vdo_state vdo_get_state(const struct vdo *vdo)
852 {
853 enum vdo_state state = atomic_read(&vdo->state);
854
855 /* pairs with barriers where state field is changed */
856 smp_rmb();
857 return state;
858 }
859
860 /**
861 * vdo_set_state() - Set the current state of the vdo.
862 * @vdo: The vdo whose state is to be set.
863 * @state: The new state of the vdo.
864 *
865 * Context: This method may be called from any thread.
866 */
vdo_set_state(struct vdo * vdo,enum vdo_state state)867 void vdo_set_state(struct vdo *vdo, enum vdo_state state)
868 {
869 /* pairs with barrier in vdo_get_state */
870 smp_wmb();
871 atomic_set(&vdo->state, state);
872 }
873
874 /**
875 * vdo_get_admin_state() - Get the admin state of the vdo.
876 * @vdo: The vdo.
877 *
878 * Return: The code for the vdo's current admin state.
879 */
vdo_get_admin_state(const struct vdo * vdo)880 const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo)
881 {
882 return vdo_get_admin_state_code(&vdo->admin.state);
883 }
884
885 /**
886 * record_vdo() - Record the state of the VDO for encoding in the super block.
887 */
record_vdo(struct vdo * vdo)888 static void record_vdo(struct vdo *vdo)
889 {
890 /* This is for backwards compatibility. */
891 vdo->states.unused = vdo->geometry.unused;
892 vdo->states.vdo.state = vdo_get_state(vdo);
893 vdo->states.block_map = vdo_record_block_map(vdo->block_map);
894 vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal);
895 vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot);
896 vdo->states.layout = vdo->layout;
897 }
898
899 /**
900 * continue_super_block_parent() - Continue the parent of a super block save operation.
901 * @completion: The super block vio.
902 *
903 * This callback is registered in vdo_save_components().
904 */
continue_super_block_parent(struct vdo_completion * completion)905 static void continue_super_block_parent(struct vdo_completion *completion)
906 {
907 vdo_continue_completion(vdo_forget(completion->parent), completion->result);
908 }
909
910 /**
911 * handle_save_error() - Log a super block save error.
912 * @completion: The super block vio.
913 *
914 * This error handler is registered in vdo_save_components().
915 */
handle_save_error(struct vdo_completion * completion)916 static void handle_save_error(struct vdo_completion *completion)
917 {
918 struct vdo_super_block *super_block =
919 container_of(as_vio(completion), struct vdo_super_block, vio);
920
921 vio_record_metadata_io_error(&super_block->vio);
922 vdo_log_error_strerror(completion->result, "super block save failed");
923 /*
924 * Mark the super block as unwritable so that we won't attempt to write it again. This
925 * avoids the case where a growth attempt fails writing the super block with the new size,
926 * but the subsequent attempt to write out the read-only state succeeds. In this case,
927 * writes which happened just before the suspend would not be visible if the VDO is
928 * restarted without rebuilding, but, after a read-only rebuild, the effects of those
929 * writes would reappear.
930 */
931 super_block->unwritable = true;
932 completion->callback(completion);
933 }
934
super_block_write_endio(struct bio * bio)935 static void super_block_write_endio(struct bio *bio)
936 {
937 struct vio *vio = bio->bi_private;
938 struct vdo_completion *parent = vio->completion.parent;
939
940 continue_vio_after_io(vio, continue_super_block_parent,
941 parent->callback_thread_id);
942 }
943
944 /**
945 * vdo_save_components() - Encode the vdo and save the super block asynchronously.
946 * @vdo: The vdo whose state is being saved.
947 * @parent: The completion to notify when the save is complete.
948 */
vdo_save_components(struct vdo * vdo,struct vdo_completion * parent)949 void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent)
950 {
951 struct vdo_super_block *super_block = &vdo->super_block;
952
953 if (super_block->unwritable) {
954 vdo_continue_completion(parent, VDO_READ_ONLY);
955 return;
956 }
957
958 if (super_block->vio.completion.parent != NULL) {
959 vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
960 return;
961 }
962
963 record_vdo(vdo);
964
965 vdo_encode_super_block(super_block->buffer, &vdo->states);
966 super_block->vio.completion.parent = parent;
967 super_block->vio.completion.callback_thread_id = parent->callback_thread_id;
968 vdo_submit_metadata_vio(&super_block->vio,
969 vdo_get_data_region_start(vdo->geometry),
970 super_block_write_endio, handle_save_error,
971 REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA);
972 }
973
974 /**
975 * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes
976 * read-only.
977 * @vdo: The vdo to register with.
978 * @listener: The object to notify.
979 * @notification: The function to call to send the notification.
980 * @thread_id: The id of the thread on which to send the notification.
981 *
982 * Return: VDO_SUCCESS or an error.
983 */
vdo_register_read_only_listener(struct vdo * vdo,void * listener,vdo_read_only_notification_fn notification,thread_id_t thread_id)984 int vdo_register_read_only_listener(struct vdo *vdo, void *listener,
985 vdo_read_only_notification_fn notification,
986 thread_id_t thread_id)
987 {
988 struct vdo_thread *thread = &vdo->threads[thread_id];
989 struct read_only_listener *read_only_listener;
990 int result;
991
992 result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread,
993 "read only listener not registered on dedupe thread");
994 if (result != VDO_SUCCESS)
995 return result;
996
997 result = vdo_allocate(1, struct read_only_listener, __func__,
998 &read_only_listener);
999 if (result != VDO_SUCCESS)
1000 return result;
1001
1002 *read_only_listener = (struct read_only_listener) {
1003 .listener = listener,
1004 .notify = notification,
1005 .next = thread->listeners,
1006 };
1007
1008 thread->listeners = read_only_listener;
1009 return VDO_SUCCESS;
1010 }
1011
1012 /**
1013 * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only.
1014 * @listener: The vdo.
1015 * @parent: The completion to notify in order to acknowledge the notification.
1016 *
1017 * This will save the read-only state to the super block.
1018 *
1019 * Implements vdo_read_only_notification_fn.
1020 */
notify_vdo_of_read_only_mode(void * listener,struct vdo_completion * parent)1021 static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent)
1022 {
1023 struct vdo *vdo = listener;
1024
1025 if (vdo_in_read_only_mode(vdo))
1026 vdo_finish_completion(parent);
1027
1028 vdo_set_state(vdo, VDO_READ_ONLY_MODE);
1029 vdo_save_components(vdo, parent);
1030 }
1031
1032 /**
1033 * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors.
1034 * @vdo: The vdo to enable.
1035 *
1036 * Return: VDO_SUCCESS or an error.
1037 */
vdo_enable_read_only_entry(struct vdo * vdo)1038 int vdo_enable_read_only_entry(struct vdo *vdo)
1039 {
1040 thread_id_t id;
1041 bool is_read_only = vdo_in_read_only_mode(vdo);
1042 struct read_only_notifier *notifier = &vdo->read_only_notifier;
1043
1044 if (is_read_only) {
1045 notifier->read_only_error = VDO_READ_ONLY;
1046 notifier->state = NOTIFIED;
1047 } else {
1048 notifier->state = MAY_NOT_NOTIFY;
1049 }
1050
1051 spin_lock_init(¬ifier->lock);
1052 vdo_initialize_completion(¬ifier->completion, vdo,
1053 VDO_READ_ONLY_MODE_COMPLETION);
1054
1055 for (id = 0; id < vdo->thread_config.thread_count; id++)
1056 vdo->threads[id].is_read_only = is_read_only;
1057
1058 return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode,
1059 vdo->thread_config.admin_thread);
1060 }
1061
1062 /**
1063 * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in
1064 * progress and prevent any subsequent
1065 * notifications.
1066 * @parent: The completion to notify when no threads are entering read-only mode.
1067 *
1068 * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry().
1069 */
vdo_wait_until_not_entering_read_only_mode(struct vdo_completion * parent)1070 void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent)
1071 {
1072 struct vdo *vdo = parent->vdo;
1073 struct read_only_notifier *notifier = &vdo->read_only_notifier;
1074
1075 vdo_assert_on_admin_thread(vdo, __func__);
1076
1077 if (notifier->waiter != NULL) {
1078 vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1079 return;
1080 }
1081
1082 spin_lock(¬ifier->lock);
1083 if (notifier->state == NOTIFYING)
1084 notifier->waiter = parent;
1085 else if (notifier->state == MAY_NOTIFY)
1086 notifier->state = MAY_NOT_NOTIFY;
1087 spin_unlock(¬ifier->lock);
1088
1089 if (notifier->waiter == NULL) {
1090 /*
1091 * A notification was not in progress, and now they are
1092 * disallowed.
1093 */
1094 vdo_launch_completion(parent);
1095 return;
1096 }
1097 }
1098
1099 /**
1100 * as_notifier() - Convert a generic vdo_completion to a read_only_notifier.
1101 * @completion: The completion to convert.
1102 *
1103 * Return: The completion as a read_only_notifier.
1104 */
as_notifier(struct vdo_completion * completion)1105 static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion)
1106 {
1107 vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION);
1108 return container_of(completion, struct read_only_notifier, completion);
1109 }
1110
1111 /**
1112 * finish_entering_read_only_mode() - Complete the process of entering read only mode.
1113 * @completion: The read-only mode completion.
1114 */
finish_entering_read_only_mode(struct vdo_completion * completion)1115 static void finish_entering_read_only_mode(struct vdo_completion *completion)
1116 {
1117 struct read_only_notifier *notifier = as_notifier(completion);
1118
1119 vdo_assert_on_admin_thread(completion->vdo, __func__);
1120
1121 spin_lock(¬ifier->lock);
1122 notifier->state = NOTIFIED;
1123 spin_unlock(¬ifier->lock);
1124
1125 if (notifier->waiter != NULL)
1126 vdo_continue_completion(vdo_forget(notifier->waiter),
1127 completion->result);
1128 }
1129
1130 /**
1131 * make_thread_read_only() - Inform each thread that the VDO is in read-only mode.
1132 * @completion: The read-only mode completion.
1133 */
make_thread_read_only(struct vdo_completion * completion)1134 static void make_thread_read_only(struct vdo_completion *completion)
1135 {
1136 struct vdo *vdo = completion->vdo;
1137 thread_id_t thread_id = completion->callback_thread_id;
1138 struct read_only_notifier *notifier = as_notifier(completion);
1139 struct read_only_listener *listener = completion->parent;
1140
1141 if (listener == NULL) {
1142 /* This is the first call on this thread */
1143 struct vdo_thread *thread = &vdo->threads[thread_id];
1144
1145 thread->is_read_only = true;
1146 listener = thread->listeners;
1147 if (thread_id == 0)
1148 vdo_log_error_strerror(READ_ONCE(notifier->read_only_error),
1149 "Unrecoverable error, entering read-only mode");
1150 } else {
1151 /* We've just finished notifying a listener */
1152 listener = listener->next;
1153 }
1154
1155 if (listener != NULL) {
1156 /* We have a listener to notify */
1157 vdo_prepare_completion(completion, make_thread_read_only,
1158 make_thread_read_only, thread_id,
1159 listener);
1160 listener->notify(listener->listener, completion);
1161 return;
1162 }
1163
1164 /* We're done with this thread */
1165 if (++thread_id == vdo->thread_config.dedupe_thread) {
1166 /*
1167 * We don't want to notify the dedupe thread since it may be
1168 * blocked rebuilding the index.
1169 */
1170 thread_id++;
1171 }
1172
1173 if (thread_id >= vdo->thread_config.thread_count) {
1174 /* There are no more threads */
1175 vdo_prepare_completion(completion, finish_entering_read_only_mode,
1176 finish_entering_read_only_mode,
1177 vdo->thread_config.admin_thread, NULL);
1178 } else {
1179 vdo_prepare_completion(completion, make_thread_read_only,
1180 make_thread_read_only, thread_id, NULL);
1181 }
1182
1183 vdo_launch_completion(completion);
1184 }
1185
1186 /**
1187 * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode,
1188 * reversing the effects of
1189 * vdo_wait_until_not_entering_read_only_mode().
1190 * @parent: The object to notify once the operation is complete.
1191 *
1192 * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it
1193 * will be done when this method is called. If that happens, the parent will not be notified until
1194 * the vdo has actually entered read-only mode and attempted to save the super block.
1195 *
1196 * Context: This method may only be called from the admin thread.
1197 */
vdo_allow_read_only_mode_entry(struct vdo_completion * parent)1198 void vdo_allow_read_only_mode_entry(struct vdo_completion *parent)
1199 {
1200 struct vdo *vdo = parent->vdo;
1201 struct read_only_notifier *notifier = &vdo->read_only_notifier;
1202
1203 vdo_assert_on_admin_thread(vdo, __func__);
1204
1205 if (notifier->waiter != NULL) {
1206 vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1207 return;
1208 }
1209
1210 spin_lock(¬ifier->lock);
1211 if (notifier->state == MAY_NOT_NOTIFY) {
1212 if (notifier->read_only_error == VDO_SUCCESS) {
1213 notifier->state = MAY_NOTIFY;
1214 } else {
1215 notifier->state = NOTIFYING;
1216 notifier->waiter = parent;
1217 }
1218 }
1219 spin_unlock(¬ifier->lock);
1220
1221 if (notifier->waiter == NULL) {
1222 /* We're done */
1223 vdo_launch_completion(parent);
1224 return;
1225 }
1226
1227 /* Do the pending notification. */
1228 make_thread_read_only(¬ifier->completion);
1229 }
1230
1231 /**
1232 * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the
1233 * super block.
1234 * @vdo: The vdo.
1235 * @error_code: The error which caused the VDO to enter read-only mode.
1236 *
1237 * This method is a no-op if the VDO is already read-only.
1238 */
vdo_enter_read_only_mode(struct vdo * vdo,int error_code)1239 void vdo_enter_read_only_mode(struct vdo *vdo, int error_code)
1240 {
1241 bool notify = false;
1242 thread_id_t thread_id = vdo_get_callback_thread_id();
1243 struct read_only_notifier *notifier = &vdo->read_only_notifier;
1244 struct vdo_thread *thread;
1245
1246 if (thread_id != VDO_INVALID_THREAD_ID) {
1247 thread = &vdo->threads[thread_id];
1248 if (thread->is_read_only) {
1249 /* This thread has already gone read-only. */
1250 return;
1251 }
1252
1253 /* Record for this thread that the VDO is read-only. */
1254 thread->is_read_only = true;
1255 }
1256
1257 spin_lock(¬ifier->lock);
1258 if (notifier->read_only_error == VDO_SUCCESS) {
1259 WRITE_ONCE(notifier->read_only_error, error_code);
1260 if (notifier->state == MAY_NOTIFY) {
1261 notifier->state = NOTIFYING;
1262 notify = true;
1263 }
1264 }
1265 spin_unlock(¬ifier->lock);
1266
1267 if (!notify) {
1268 /* The notifier is already aware of a read-only error */
1269 return;
1270 }
1271
1272 /* Initiate a notification starting on the lowest numbered thread. */
1273 vdo_launch_completion_callback(¬ifier->completion, make_thread_read_only, 0);
1274 }
1275
1276 /**
1277 * vdo_is_read_only() - Check whether the VDO is read-only.
1278 * @vdo: The vdo.
1279 *
1280 * Return: true if the vdo is read-only.
1281 *
1282 * This method may be called from any thread, as opposed to examining the VDO's state field which
1283 * is only safe to check from the admin thread.
1284 */
vdo_is_read_only(struct vdo * vdo)1285 bool vdo_is_read_only(struct vdo *vdo)
1286 {
1287 return vdo->threads[vdo_get_callback_thread_id()].is_read_only;
1288 }
1289
1290 /**
1291 * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode.
1292 * @vdo: The vdo to query.
1293 *
1294 * Return: true if the vdo is in read-only mode.
1295 */
vdo_in_read_only_mode(const struct vdo * vdo)1296 bool vdo_in_read_only_mode(const struct vdo *vdo)
1297 {
1298 return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE);
1299 }
1300
1301 /**
1302 * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode.
1303 * @vdo: The vdo to query.
1304 *
1305 * Return: true if the vdo is in recovery mode.
1306 */
vdo_in_recovery_mode(const struct vdo * vdo)1307 bool vdo_in_recovery_mode(const struct vdo *vdo)
1308 {
1309 return (vdo_get_state(vdo) == VDO_RECOVERING);
1310 }
1311
1312 /**
1313 * vdo_enter_recovery_mode() - Put the vdo into recovery mode.
1314 * @vdo: The vdo.
1315 */
vdo_enter_recovery_mode(struct vdo * vdo)1316 void vdo_enter_recovery_mode(struct vdo *vdo)
1317 {
1318 vdo_assert_on_admin_thread(vdo, __func__);
1319
1320 if (vdo_in_read_only_mode(vdo))
1321 return;
1322
1323 vdo_log_info("Entering recovery mode");
1324 vdo_set_state(vdo, VDO_RECOVERING);
1325 }
1326
1327 /**
1328 * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete.
1329 * @completion: The sync completion.
1330 */
complete_synchronous_action(struct vdo_completion * completion)1331 static void complete_synchronous_action(struct vdo_completion *completion)
1332 {
1333 vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION);
1334 complete(&(container_of(completion, struct sync_completion,
1335 vdo_completion)->completion));
1336 }
1337
1338 /**
1339 * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete.
1340 * @vdo: The vdo.
1341 * @action: The callback to launch.
1342 * @thread_id: The thread on which to run the action.
1343 * @parent: The parent of the sync completion (may be NULL).
1344 */
perform_synchronous_action(struct vdo * vdo,vdo_action_fn action,thread_id_t thread_id,void * parent)1345 static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action,
1346 thread_id_t thread_id, void *parent)
1347 {
1348 struct sync_completion sync;
1349
1350 vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION);
1351 init_completion(&sync.completion);
1352 sync.vdo_completion.parent = parent;
1353 vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id);
1354 wait_for_completion(&sync.completion);
1355 return sync.vdo_completion.result;
1356 }
1357
1358 /**
1359 * set_compression_callback() - Callback to turn compression on or off.
1360 * @completion: The completion.
1361 */
set_compression_callback(struct vdo_completion * completion)1362 static void set_compression_callback(struct vdo_completion *completion)
1363 {
1364 struct vdo *vdo = completion->vdo;
1365 bool *enable = completion->parent;
1366 bool was_enabled = vdo_get_compressing(vdo);
1367
1368 if (*enable != was_enabled) {
1369 WRITE_ONCE(vdo->compressing, *enable);
1370 if (was_enabled) {
1371 /* Signal the packer to flush since compression has been disabled. */
1372 vdo_flush_packer(vdo->packer);
1373 }
1374 }
1375
1376 vdo_log_info("compression is %s", (*enable ? "enabled" : "disabled"));
1377 *enable = was_enabled;
1378 complete_synchronous_action(completion);
1379 }
1380
1381 /**
1382 * vdo_set_compressing() - Turn compression on or off.
1383 * @vdo: The vdo.
1384 * @enable: Whether to enable or disable compression.
1385 *
1386 * Return: Whether compression was previously on or off.
1387 */
vdo_set_compressing(struct vdo * vdo,bool enable)1388 bool vdo_set_compressing(struct vdo *vdo, bool enable)
1389 {
1390 perform_synchronous_action(vdo, set_compression_callback,
1391 vdo->thread_config.packer_thread,
1392 &enable);
1393 return enable;
1394 }
1395
1396 /**
1397 * vdo_get_compressing() - Get whether compression is enabled in a vdo.
1398 * @vdo: The vdo.
1399 *
1400 * Return: State of compression.
1401 */
vdo_get_compressing(struct vdo * vdo)1402 bool vdo_get_compressing(struct vdo *vdo)
1403 {
1404 return READ_ONCE(vdo->compressing);
1405 }
1406
get_block_map_cache_size(const struct vdo * vdo)1407 static size_t get_block_map_cache_size(const struct vdo *vdo)
1408 {
1409 return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE;
1410 }
1411
get_vdo_error_statistics(const struct vdo * vdo)1412 static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo)
1413 {
1414 /*
1415 * The error counts can be incremented from arbitrary threads and so must be incremented
1416 * atomically, but they are just statistics with no semantics that could rely on memory
1417 * order, so unfenced reads are sufficient.
1418 */
1419 const struct atomic_statistics *atoms = &vdo->stats;
1420
1421 return (struct error_statistics) {
1422 .invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count),
1423 .no_space_error_count = atomic64_read(&atoms->no_space_error_count),
1424 .read_only_error_count = atomic64_read(&atoms->read_only_error_count),
1425 };
1426 }
1427
copy_bio_stat(struct bio_stats * b,const struct atomic_bio_stats * a)1428 static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a)
1429 {
1430 b->read = atomic64_read(&a->read);
1431 b->write = atomic64_read(&a->write);
1432 b->discard = atomic64_read(&a->discard);
1433 b->flush = atomic64_read(&a->flush);
1434 b->empty_flush = atomic64_read(&a->empty_flush);
1435 b->fua = atomic64_read(&a->fua);
1436 }
1437
subtract_bio_stats(struct bio_stats minuend,struct bio_stats subtrahend)1438 static struct bio_stats subtract_bio_stats(struct bio_stats minuend,
1439 struct bio_stats subtrahend)
1440 {
1441 return (struct bio_stats) {
1442 .read = minuend.read - subtrahend.read,
1443 .write = minuend.write - subtrahend.write,
1444 .discard = minuend.discard - subtrahend.discard,
1445 .flush = minuend.flush - subtrahend.flush,
1446 .empty_flush = minuend.empty_flush - subtrahend.empty_flush,
1447 .fua = minuend.fua - subtrahend.fua,
1448 };
1449 }
1450
1451 /**
1452 * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data.
1453 * @vdo: The vdo.
1454 *
1455 * Return: The number of blocks allocated for user data.
1456 */
vdo_get_physical_blocks_allocated(const struct vdo * vdo)1457 static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo)
1458 {
1459 return (vdo_get_slab_depot_allocated_blocks(vdo->depot) -
1460 vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1461 }
1462
1463 /**
1464 * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata.
1465 * @vdo: The vdo.
1466 *
1467 * Return: The number of overhead blocks.
1468 */
vdo_get_physical_blocks_overhead(const struct vdo * vdo)1469 static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo)
1470 {
1471 /*
1472 * config.physical_blocks is mutated during resize and is in a packed structure,
1473 * but resize runs on admin thread.
1474 * TODO: Verify that this is always safe.
1475 */
1476 return (vdo->states.vdo.config.physical_blocks -
1477 vdo_get_slab_depot_data_blocks(vdo->depot) +
1478 vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1479 }
1480
vdo_describe_state(enum vdo_state state)1481 static const char *vdo_describe_state(enum vdo_state state)
1482 {
1483 /* These strings should all fit in the 15 chars of VDOStatistics.mode. */
1484 switch (state) {
1485 case VDO_RECOVERING:
1486 return "recovering";
1487
1488 case VDO_READ_ONLY_MODE:
1489 return "read-only";
1490
1491 default:
1492 return "normal";
1493 }
1494 }
1495
1496 /**
1497 * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread.
1498 * @vdo: The vdo.
1499 * @stats: The statistics structure to populate.
1500 */
get_vdo_statistics(const struct vdo * vdo,struct vdo_statistics * stats)1501 static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats)
1502 {
1503 struct recovery_journal *journal = vdo->recovery_journal;
1504 enum vdo_state state = vdo_get_state(vdo);
1505
1506 vdo_assert_on_admin_thread(vdo, __func__);
1507
1508 /* start with a clean slate */
1509 memset(stats, 0, sizeof(struct vdo_statistics));
1510
1511 /*
1512 * These are immutable properties of the vdo object, so it is safe to query them from any
1513 * thread.
1514 */
1515 stats->version = STATISTICS_VERSION;
1516 stats->logical_blocks = vdo->states.vdo.config.logical_blocks;
1517 /*
1518 * config.physical_blocks is mutated during resize and is in a packed structure, but resize
1519 * runs on the admin thread.
1520 * TODO: verify that this is always safe
1521 */
1522 stats->physical_blocks = vdo->states.vdo.config.physical_blocks;
1523 stats->block_size = VDO_BLOCK_SIZE;
1524 stats->complete_recoveries = vdo->states.vdo.complete_recoveries;
1525 stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries;
1526 stats->block_map_cache_size = get_block_map_cache_size(vdo);
1527
1528 /* The callees are responsible for thread-safety. */
1529 stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo);
1530 stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo);
1531 stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal);
1532 vdo_get_slab_depot_statistics(vdo->depot, stats);
1533 stats->journal = vdo_get_recovery_journal_statistics(journal);
1534 stats->packer = vdo_get_packer_statistics(vdo->packer);
1535 stats->block_map = vdo_get_block_map_statistics(vdo->block_map);
1536 vdo_get_dedupe_statistics(vdo->hash_zones, stats);
1537 stats->errors = get_vdo_error_statistics(vdo);
1538 stats->in_recovery_mode = (state == VDO_RECOVERING);
1539 snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state));
1540
1541 stats->instance = vdo->instance;
1542 stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool);
1543 stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool);
1544
1545 stats->flush_out = atomic64_read(&vdo->stats.flush_out);
1546 stats->logical_block_size = vdo->device_config->logical_block_size;
1547 copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in);
1548 copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial);
1549 copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out);
1550 copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta);
1551 copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal);
1552 copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache);
1553 copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed);
1554 copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed);
1555 copy_bio_stat(&stats->bios_journal_completed,
1556 &vdo->stats.bios_journal_completed);
1557 copy_bio_stat(&stats->bios_page_cache_completed,
1558 &vdo->stats.bios_page_cache_completed);
1559 copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged);
1560 copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial);
1561 stats->bios_in_progress =
1562 subtract_bio_stats(stats->bios_in, stats->bios_acknowledged);
1563 vdo_get_memory_stats(&stats->memory_usage.bytes_used,
1564 &stats->memory_usage.peak_bytes_used);
1565 }
1566
1567 /**
1568 * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics
1569 * structure on the admin thread.
1570 * @completion: The completion.
1571 *
1572 * This callback is registered in vdo_fetch_statistics().
1573 */
vdo_fetch_statistics_callback(struct vdo_completion * completion)1574 static void vdo_fetch_statistics_callback(struct vdo_completion *completion)
1575 {
1576 get_vdo_statistics(completion->vdo, completion->parent);
1577 complete_synchronous_action(completion);
1578 }
1579
1580 /**
1581 * vdo_fetch_statistics() - Fetch statistics on the correct thread.
1582 * @vdo: The vdo.
1583 * @stats: The vdo statistics are returned here.
1584 */
vdo_fetch_statistics(struct vdo * vdo,struct vdo_statistics * stats)1585 void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats)
1586 {
1587 perform_synchronous_action(vdo, vdo_fetch_statistics_callback,
1588 vdo->thread_config.admin_thread, stats);
1589 }
1590
1591 /**
1592 * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is
1593 * currently running.
1594 *
1595 * Return: The current thread ID, or -1 if no such thread.
1596 */
vdo_get_callback_thread_id(void)1597 thread_id_t vdo_get_callback_thread_id(void)
1598 {
1599 struct vdo_work_queue *queue = vdo_get_current_work_queue();
1600 struct vdo_thread *thread;
1601 thread_id_t thread_id;
1602
1603 if (queue == NULL)
1604 return VDO_INVALID_THREAD_ID;
1605
1606 thread = vdo_get_work_queue_owner(queue);
1607 thread_id = thread->thread_id;
1608
1609 if (PARANOID_THREAD_CONSISTENCY_CHECKS) {
1610 BUG_ON(thread_id >= thread->vdo->thread_config.thread_count);
1611 BUG_ON(thread != &thread->vdo->threads[thread_id]);
1612 }
1613
1614 return thread_id;
1615 }
1616
1617 /**
1618 * vdo_dump_status() - Dump status information about a vdo to the log for debugging.
1619 * @vdo: The vdo to dump.
1620 */
vdo_dump_status(const struct vdo * vdo)1621 void vdo_dump_status(const struct vdo *vdo)
1622 {
1623 zone_count_t zone;
1624
1625 vdo_dump_flusher(vdo->flusher);
1626 vdo_dump_recovery_journal_statistics(vdo->recovery_journal);
1627 vdo_dump_packer(vdo->packer);
1628 vdo_dump_slab_depot(vdo->depot);
1629
1630 for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++)
1631 vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]);
1632
1633 for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++)
1634 vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]);
1635
1636 vdo_dump_hash_zones(vdo->hash_zones);
1637 }
1638
1639 /**
1640 * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread.
1641 * @vdo: The vdo.
1642 * @name: The name of the function which should be running on the admin thread (for logging).
1643 */
vdo_assert_on_admin_thread(const struct vdo * vdo,const char * name)1644 void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name)
1645 {
1646 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread),
1647 "%s called on admin thread", name);
1648 }
1649
1650 /**
1651 * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified
1652 * logical zone thread.
1653 * @vdo: The vdo.
1654 * @logical_zone: The number of the logical zone.
1655 * @name: The name of the calling function.
1656 */
vdo_assert_on_logical_zone_thread(const struct vdo * vdo,zone_count_t logical_zone,const char * name)1657 void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone,
1658 const char *name)
1659 {
1660 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1661 vdo->thread_config.logical_threads[logical_zone]),
1662 "%s called on logical thread", name);
1663 }
1664
1665 /**
1666 * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified
1667 * physical zone thread.
1668 * @vdo: The vdo.
1669 * @physical_zone: The number of the physical zone.
1670 * @name: The name of the calling function.
1671 */
vdo_assert_on_physical_zone_thread(const struct vdo * vdo,zone_count_t physical_zone,const char * name)1672 void vdo_assert_on_physical_zone_thread(const struct vdo *vdo,
1673 zone_count_t physical_zone, const char *name)
1674 {
1675 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1676 vdo->thread_config.physical_threads[physical_zone]),
1677 "%s called on physical thread", name);
1678 }
1679
1680 /**
1681 * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number.
1682 * @vdo: The vdo containing the physical zones.
1683 * @pbn: The PBN of the data block.
1684 * @zone_ptr: A pointer to return the physical zone.
1685 *
1686 * Gets the physical zone responsible for a given physical block number of a data block in this vdo
1687 * instance, or of the zero block (for which a NULL zone is returned). For any other block number
1688 * that is not in the range of valid data block numbers in any slab, an error will be returned.
1689 * This function is safe to call on invalid block numbers; it will not put the vdo into read-only
1690 * mode.
1691 *
1692 * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any
1693 * other failure.
1694 */
vdo_get_physical_zone(const struct vdo * vdo,physical_block_number_t pbn,struct physical_zone ** zone_ptr)1695 int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn,
1696 struct physical_zone **zone_ptr)
1697 {
1698 struct vdo_slab *slab;
1699 int result;
1700
1701 if (pbn == VDO_ZERO_BLOCK) {
1702 *zone_ptr = NULL;
1703 return VDO_SUCCESS;
1704 }
1705
1706 /*
1707 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first
1708 * because it won't trigger read-only mode on an invalid PBN.
1709 */
1710 if (!vdo_is_physical_data_block(vdo->depot, pbn))
1711 return VDO_OUT_OF_RANGE;
1712
1713 /* With the PBN already checked, we should always succeed in finding a slab. */
1714 slab = vdo_get_slab(vdo->depot, pbn);
1715 result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs");
1716 if (result != VDO_SUCCESS)
1717 return result;
1718
1719 *zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number];
1720 return VDO_SUCCESS;
1721 }
1722