1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 /* 7 * This file contains the main entry points for normal operations on a vdo as well as functions for 8 * constructing and destroying vdo instances (in memory). 9 */ 10 11 /** 12 * DOC: 13 * 14 * A read_only_notifier has a single completion which is used to perform read-only notifications, 15 * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected 16 * by a spinlock, are used to control the read-only mode entry process. The first field holds the 17 * read-only error. The second is the state field, which may hold any of the four special values 18 * enumerated here. 19 * 20 * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field 21 * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already 22 * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in 23 * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is 24 * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then 25 * notifications are currently disallowed, generally due to the vdo being suspended. In this case, 26 * the nothing more will be done until the vdo is resumed, at which point the notification will be 27 * performed. In any other case, the vdo is already read-only, and there is nothing more to do. 28 */ 29 30 #include "vdo.h" 31 32 #include <linux/completion.h> 33 #include <linux/device-mapper.h> 34 #include <linux/lz4.h> 35 #include <linux/mutex.h> 36 #include <linux/spinlock.h> 37 #include <linux/string.h> 38 #include <linux/types.h> 39 #include <linux/uuid.h> 40 41 #include "logger.h" 42 #include "memory-alloc.h" 43 #include "permassert.h" 44 #include "string-utils.h" 45 46 #include "block-map.h" 47 #include "completion.h" 48 #include "data-vio.h" 49 #include "dedupe.h" 50 #include "encodings.h" 51 #include "funnel-workqueue.h" 52 #include "io-submitter.h" 53 #include "logical-zone.h" 54 #include "packer.h" 55 #include "physical-zone.h" 56 #include "recovery-journal.h" 57 #include "slab-depot.h" 58 #include "statistics.h" 59 #include "status-codes.h" 60 #include "time-utils.h" 61 #include "vio.h" 62 63 #define PARANOID_THREAD_CONSISTENCY_CHECKS 0 64 65 struct sync_completion { 66 struct vdo_completion vdo_completion; 67 struct completion completion; 68 }; 69 70 /* A linked list is adequate for the small number of entries we expect. */ 71 struct device_registry { 72 struct list_head links; 73 /* TODO: Convert to rcu per kernel recommendation. */ 74 rwlock_t lock; 75 }; 76 77 static struct device_registry registry; 78 79 /** 80 * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device 81 * registry. 82 */ 83 void vdo_initialize_device_registry_once(void) 84 { 85 INIT_LIST_HEAD(®istry.links); 86 rwlock_init(®istry.lock); 87 } 88 89 /** vdo_is_equal() - Implements vdo_filter_fn. */ 90 static bool vdo_is_equal(struct vdo *vdo, const void *context) 91 { 92 return (vdo == context); 93 } 94 95 /** 96 * filter_vdos_locked() - Find a vdo in the registry if it exists there. 97 * @filter: The filter function to apply to devices. 98 * @context: A bit of context to provide the filter. 99 * 100 * Context: Must be called holding the lock. 101 * 102 * Return: the vdo object found, if any. 103 */ 104 static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter, 105 const void *context) 106 { 107 struct vdo *vdo; 108 109 list_for_each_entry(vdo, ®istry.links, registration) { 110 if (filter(vdo, context)) 111 return vdo; 112 } 113 114 return NULL; 115 } 116 117 /** 118 * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function. 119 * @filter: The filter function to apply to vdos. 120 * @context: A bit of context to provide the filter. 121 */ 122 struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context) 123 { 124 struct vdo *vdo; 125 126 read_lock(®istry.lock); 127 vdo = filter_vdos_locked(filter, context); 128 read_unlock(®istry.lock); 129 130 return vdo; 131 } 132 133 static void start_vdo_request_queue(void *ptr) 134 { 135 struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue()); 136 137 vdo_register_allocating_thread(&thread->allocating_thread, 138 &thread->vdo->allocations_allowed); 139 } 140 141 static void finish_vdo_request_queue(void *ptr) 142 { 143 vdo_unregister_allocating_thread(); 144 } 145 146 static const struct vdo_work_queue_type default_queue_type = { 147 .start = start_vdo_request_queue, 148 .finish = finish_vdo_request_queue, 149 .max_priority = VDO_DEFAULT_Q_MAX_PRIORITY, 150 .default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY, 151 }; 152 153 static const struct vdo_work_queue_type bio_ack_q_type = { 154 .start = NULL, 155 .finish = NULL, 156 .max_priority = BIO_ACK_Q_MAX_PRIORITY, 157 .default_priority = BIO_ACK_Q_ACK_PRIORITY, 158 }; 159 160 static const struct vdo_work_queue_type cpu_q_type = { 161 .start = NULL, 162 .finish = NULL, 163 .max_priority = CPU_Q_MAX_PRIORITY, 164 .default_priority = CPU_Q_MAX_PRIORITY, 165 }; 166 167 static void uninitialize_thread_config(struct thread_config *config) 168 { 169 vdo_free(vdo_forget(config->logical_threads)); 170 vdo_free(vdo_forget(config->physical_threads)); 171 vdo_free(vdo_forget(config->hash_zone_threads)); 172 vdo_free(vdo_forget(config->bio_threads)); 173 memset(config, 0, sizeof(struct thread_config)); 174 } 175 176 static void assign_thread_ids(struct thread_config *config, 177 thread_id_t thread_ids[], zone_count_t count) 178 { 179 zone_count_t zone; 180 181 for (zone = 0; zone < count; zone++) 182 thread_ids[zone] = config->thread_count++; 183 } 184 185 /** 186 * initialize_thread_config() - Initialize the thread mapping 187 * @counts: The number and types of threads to create. 188 * @config: The thread_config to initialize. 189 * 190 * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all 191 * three plus the packer and recovery journal. Otherwise, there must be at least one of each type, 192 * and each will have its own thread, as will the packer and recovery journal. 193 * 194 * Return: VDO_SUCCESS or an error. 195 */ 196 static int __must_check initialize_thread_config(struct thread_count_config counts, 197 struct thread_config *config) 198 { 199 int result; 200 bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0); 201 202 config->bio_thread_count = counts.bio_threads; 203 if (single) { 204 config->logical_zone_count = 1; 205 config->physical_zone_count = 1; 206 config->hash_zone_count = 1; 207 } else { 208 config->logical_zone_count = counts.logical_zones; 209 config->physical_zone_count = counts.physical_zones; 210 config->hash_zone_count = counts.hash_zones; 211 } 212 213 result = vdo_allocate(config->logical_zone_count, "logical thread array", 214 &config->logical_threads); 215 if (result != VDO_SUCCESS) { 216 uninitialize_thread_config(config); 217 return result; 218 } 219 220 result = vdo_allocate(config->physical_zone_count, "physical thread array", 221 &config->physical_threads); 222 if (result != VDO_SUCCESS) { 223 uninitialize_thread_config(config); 224 return result; 225 } 226 227 result = vdo_allocate(config->hash_zone_count, "hash thread array", 228 &config->hash_zone_threads); 229 if (result != VDO_SUCCESS) { 230 uninitialize_thread_config(config); 231 return result; 232 } 233 234 result = vdo_allocate(config->bio_thread_count, "bio thread array", &config->bio_threads); 235 if (result != VDO_SUCCESS) { 236 uninitialize_thread_config(config); 237 return result; 238 } 239 240 if (single) { 241 config->logical_threads[0] = config->thread_count; 242 config->physical_threads[0] = config->thread_count; 243 config->hash_zone_threads[0] = config->thread_count++; 244 } else { 245 config->admin_thread = config->thread_count; 246 config->journal_thread = config->thread_count++; 247 config->packer_thread = config->thread_count++; 248 assign_thread_ids(config, config->logical_threads, counts.logical_zones); 249 assign_thread_ids(config, config->physical_threads, counts.physical_zones); 250 assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones); 251 } 252 253 config->dedupe_thread = config->thread_count++; 254 config->bio_ack_thread = 255 ((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID); 256 config->cpu_thread = config->thread_count++; 257 assign_thread_ids(config, config->bio_threads, counts.bio_threads); 258 return VDO_SUCCESS; 259 } 260 261 static int initialize_geometry_block(struct vdo *vdo, 262 struct vdo_geometry_block *geometry_block) 263 { 264 int result; 265 266 result = vdo_allocate(VDO_BLOCK_SIZE, "encoded geometry block", 267 (char **) &vdo->geometry_block.buffer); 268 if (result != VDO_SUCCESS) 269 return result; 270 271 return allocate_vio_components(vdo, VIO_TYPE_GEOMETRY, 272 VIO_PRIORITY_METADATA, NULL, 1, 273 (char *) geometry_block->buffer, 274 &vdo->geometry_block.vio); 275 } 276 277 static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block) 278 { 279 int result; 280 281 result = vdo_allocate(VDO_BLOCK_SIZE, "encoded super block", 282 (char **) &vdo->super_block.buffer); 283 if (result != VDO_SUCCESS) 284 return result; 285 286 return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK, 287 VIO_PRIORITY_METADATA, NULL, 1, 288 (char *) super_block->buffer, 289 &vdo->super_block.vio); 290 } 291 292 static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count, 293 thread_id_t id, const char *prefix, 294 char *buffer, size_t buffer_length) 295 { 296 if (id >= thread_ids[0]) { 297 thread_id_t index = id - thread_ids[0]; 298 299 if (index < count) { 300 snprintf(buffer, buffer_length, "%s%d", prefix, index); 301 return true; 302 } 303 } 304 305 return false; 306 } 307 308 /** 309 * get_thread_name() - Format the name of the worker thread desired to support a given work queue. 310 * @thread_config: The thread configuration. 311 * @thread_id: The thread id. 312 * @buffer: Where to put the formatted name. 313 * @buffer_length: Size of the output buffer. 314 * 315 * The physical layer may add a prefix identifying the product; the output from this function 316 * should just identify the thread. 317 */ 318 static void get_thread_name(const struct thread_config *thread_config, 319 thread_id_t thread_id, char *buffer, size_t buffer_length) 320 { 321 if (thread_id == thread_config->journal_thread) { 322 if (thread_config->packer_thread == thread_id) { 323 /* 324 * This is the "single thread" config where one thread is used for the 325 * journal, packer, logical, physical, and hash zones. In that case, it is 326 * known as the "request queue." 327 */ 328 snprintf(buffer, buffer_length, "reqQ"); 329 return; 330 } 331 332 snprintf(buffer, buffer_length, "journalQ"); 333 return; 334 } else if (thread_id == thread_config->admin_thread) { 335 /* Theoretically this could be different from the journal thread. */ 336 snprintf(buffer, buffer_length, "adminQ"); 337 return; 338 } else if (thread_id == thread_config->packer_thread) { 339 snprintf(buffer, buffer_length, "packerQ"); 340 return; 341 } else if (thread_id == thread_config->dedupe_thread) { 342 snprintf(buffer, buffer_length, "dedupeQ"); 343 return; 344 } else if (thread_id == thread_config->bio_ack_thread) { 345 snprintf(buffer, buffer_length, "ackQ"); 346 return; 347 } else if (thread_id == thread_config->cpu_thread) { 348 snprintf(buffer, buffer_length, "cpuQ"); 349 return; 350 } 351 352 if (get_zone_thread_name(thread_config->logical_threads, 353 thread_config->logical_zone_count, 354 thread_id, "logQ", buffer, buffer_length)) 355 return; 356 357 if (get_zone_thread_name(thread_config->physical_threads, 358 thread_config->physical_zone_count, 359 thread_id, "physQ", buffer, buffer_length)) 360 return; 361 362 if (get_zone_thread_name(thread_config->hash_zone_threads, 363 thread_config->hash_zone_count, 364 thread_id, "hashQ", buffer, buffer_length)) 365 return; 366 367 if (get_zone_thread_name(thread_config->bio_threads, 368 thread_config->bio_thread_count, 369 thread_id, "bioQ", buffer, buffer_length)) 370 return; 371 372 /* Some sort of misconfiguration? */ 373 snprintf(buffer, buffer_length, "reqQ%d", thread_id); 374 } 375 376 /** 377 * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for 378 * round-robin queues). 379 * @vdo: The vdo which owns the thread. 380 * @thread_id: The id of the thread to create (as determined by the thread_config). 381 * @type: The description of the work queue for this thread. 382 * @queue_count: The number of actual threads/queues contained in the "thread". 383 * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL. 384 * 385 * Each "thread" constructed by this method is represented by a unique thread id in the thread 386 * config, and completions can be enqueued to the queue and run on the threads comprising this 387 * entity. 388 * 389 * Return: VDO_SUCCESS or an error. 390 */ 391 int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id, 392 const struct vdo_work_queue_type *type, 393 unsigned int queue_count, void *contexts[]) 394 { 395 struct vdo_thread *thread = &vdo->threads[thread_id]; 396 char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN]; 397 398 if (type == NULL) 399 type = &default_queue_type; 400 401 if (thread->queue != NULL) { 402 return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type), 403 "already constructed vdo thread %u is of the correct type", 404 thread_id); 405 } 406 407 thread->vdo = vdo; 408 thread->thread_id = thread_id; 409 get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name)); 410 return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread, 411 type, queue_count, contexts, &thread->queue); 412 } 413 414 /** 415 * register_vdo() - Register a VDO; it must not already be registered. 416 * @vdo: The vdo to register. 417 * 418 * Return: VDO_SUCCESS or an error. 419 */ 420 static int register_vdo(struct vdo *vdo) 421 { 422 int result; 423 424 write_lock(®istry.lock); 425 result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL, 426 "VDO not already registered"); 427 if (result == VDO_SUCCESS) { 428 INIT_LIST_HEAD(&vdo->registration); 429 list_add_tail(&vdo->registration, ®istry.links); 430 } 431 write_unlock(®istry.lock); 432 433 return result; 434 } 435 436 /** 437 * vdo_format() - Format a block device to function as a new VDO. 438 * @vdo: The vdo to format. 439 * @error_ptr: The reason for any failure during this call. 440 * 441 * This function must be called on a device before a VDO can be loaded for the first time. 442 * Once a device has been formatted, the VDO can be loaded and shut down repeatedly. 443 * If a new VDO is desired, this function should be called again. 444 * 445 * Return: VDO_SUCCESS or an error 446 **/ 447 static int __must_check vdo_format(struct vdo *vdo, char **error_ptr) 448 { 449 int result; 450 uuid_t uuid; 451 nonce_t nonce = current_time_us(); 452 struct device_config *config = vdo->device_config; 453 454 struct index_config index_config = { 455 .mem = config->index_memory, 456 .sparse = config->index_sparse, 457 }; 458 459 struct vdo_config vdo_config = { 460 .logical_blocks = config->logical_blocks, 461 .physical_blocks = config->physical_blocks, 462 .slab_size = config->slab_blocks, 463 .slab_journal_blocks = DEFAULT_VDO_SLAB_JOURNAL_SIZE, 464 .recovery_journal_size = DEFAULT_VDO_RECOVERY_JOURNAL_SIZE, 465 }; 466 467 uuid_gen(&uuid); 468 result = vdo_initialize_volume_geometry(nonce, &uuid, &index_config, &vdo->geometry); 469 if (result != VDO_SUCCESS) { 470 *error_ptr = "Could not initialize volume geometry during format"; 471 return result; 472 } 473 474 result = vdo_initialize_component_states(&vdo_config, &vdo->geometry, nonce, &vdo->states); 475 if (result == VDO_NO_SPACE) { 476 block_count_t slab_blocks = config->slab_blocks; 477 /* 1 is counting geometry block */ 478 block_count_t fixed_layout_size = 1 + 479 vdo->geometry.regions[VDO_DATA_REGION].start_block + 480 DEFAULT_VDO_BLOCK_MAP_TREE_ROOT_COUNT + 481 DEFAULT_VDO_RECOVERY_JOURNAL_SIZE + VDO_SLAB_SUMMARY_BLOCKS; 482 block_count_t necessary_size = fixed_layout_size + slab_blocks; 483 484 vdo_log_error("Minimum required size for VDO volume: %llu bytes", 485 (unsigned long long) necessary_size * VDO_BLOCK_SIZE); 486 *error_ptr = "Could not allocate enough space for VDO during format"; 487 return result; 488 } 489 if (result != VDO_SUCCESS) { 490 *error_ptr = "Could not initialize data layout during format"; 491 return result; 492 } 493 494 vdo->needs_formatting = true; 495 496 return VDO_SUCCESS; 497 } 498 499 /** 500 * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on 501 * error. 502 * @vdo: The vdo being initialized 503 * @config: The configuration of the vdo 504 * @instance: The instance number of the vdo 505 * @reason: The buffer to hold the failure reason on error 506 */ 507 static int initialize_vdo(struct vdo *vdo, struct device_config *config, 508 unsigned int instance, char **reason) 509 { 510 int result; 511 zone_count_t i; 512 513 vdo->device_config = config; 514 vdo->starting_sector_offset = config->owning_target->begin; 515 vdo->instance = instance; 516 vdo->allocations_allowed = true; 517 vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW); 518 INIT_LIST_HEAD(&vdo->device_config_list); 519 vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION); 520 init_completion(&vdo->admin.callback_sync); 521 mutex_init(&vdo->stats_mutex); 522 523 result = initialize_geometry_block(vdo, &vdo->geometry_block); 524 if (result != VDO_SUCCESS) { 525 *reason = "Could not initialize geometry block"; 526 return result; 527 } 528 529 result = initialize_super_block(vdo, &vdo->super_block); 530 if (result != VDO_SUCCESS) { 531 *reason = "Could not initialize super block"; 532 return result; 533 } 534 535 result = vdo_submit_metadata_vio_wait(&vdo->geometry_block.vio, 536 VDO_GEOMETRY_BLOCK_LOCATION, REQ_OP_READ); 537 if (result != VDO_SUCCESS) { 538 *reason = "Could not load geometry block"; 539 return result; 540 } 541 542 if (mem_is_zero(vdo->geometry_block.vio.data, VDO_BLOCK_SIZE)) { 543 result = vdo_format(vdo, reason); 544 if (result != VDO_SUCCESS) 545 return result; 546 } else { 547 result = vdo_parse_geometry_block(vdo->geometry_block.buffer, 548 &vdo->geometry); 549 if (result != VDO_SUCCESS) { 550 *reason = "Could not parse geometry block"; 551 return result; 552 } 553 } 554 555 result = initialize_thread_config(config->thread_counts, &vdo->thread_config); 556 if (result != VDO_SUCCESS) { 557 *reason = "Cannot create thread configuration"; 558 return result; 559 } 560 561 vdo_log_info("zones: %d logical, %d physical, %d hash; total threads: %d", 562 config->thread_counts.logical_zones, 563 config->thread_counts.physical_zones, 564 config->thread_counts.hash_zones, vdo->thread_config.thread_count); 565 566 /* Compression context storage */ 567 result = vdo_allocate(config->thread_counts.cpu_threads, "LZ4 context", 568 &vdo->compression_context); 569 if (result != VDO_SUCCESS) { 570 *reason = "cannot allocate LZ4 context"; 571 return result; 572 } 573 574 for (i = 0; i < config->thread_counts.cpu_threads; i++) { 575 result = vdo_allocate(LZ4_MEM_COMPRESS, "LZ4 context", 576 &vdo->compression_context[i]); 577 if (result != VDO_SUCCESS) { 578 *reason = "cannot allocate LZ4 context"; 579 return result; 580 } 581 } 582 583 result = register_vdo(vdo); 584 if (result != VDO_SUCCESS) { 585 *reason = "Cannot add VDO to device registry"; 586 return result; 587 } 588 589 vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED); 590 return result; 591 } 592 593 /** 594 * vdo_make() - Allocate and initialize a vdo. 595 * @instance: Device instantiation counter. 596 * @config: The device configuration. 597 * @reason: The reason for any failure during this call. 598 * @vdo_ptr: A pointer to hold the created vdo. 599 * 600 * Return: VDO_SUCCESS or an error. 601 */ 602 int vdo_make(unsigned int instance, struct device_config *config, char **reason, 603 struct vdo **vdo_ptr) 604 { 605 int result; 606 struct vdo *vdo; 607 608 /* Initialize with a generic failure reason to prevent returning garbage. */ 609 *reason = "Unspecified error"; 610 611 result = vdo_allocate(1, __func__, &vdo); 612 if (result != VDO_SUCCESS) { 613 *reason = "Cannot allocate VDO"; 614 return result; 615 } 616 617 result = initialize_vdo(vdo, config, instance, reason); 618 if (result != VDO_SUCCESS) { 619 vdo_destroy(vdo); 620 return result; 621 } 622 623 /* From here on, the caller will clean up if there is an error. */ 624 *vdo_ptr = vdo; 625 626 snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix), 627 "vdo%u", instance); 628 result = vdo_allocate(vdo->thread_config.thread_count, __func__, &vdo->threads); 629 if (result != VDO_SUCCESS) { 630 *reason = "Cannot allocate thread structures"; 631 return result; 632 } 633 634 result = vdo_make_thread(vdo, vdo->thread_config.admin_thread, 635 &default_queue_type, 1, NULL); 636 if (result != VDO_SUCCESS) { 637 *reason = "Cannot make admin thread"; 638 return result; 639 } 640 641 result = vdo_make_flusher(vdo); 642 if (result != VDO_SUCCESS) { 643 *reason = "Cannot make flusher zones"; 644 return result; 645 } 646 647 result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer); 648 if (result != VDO_SUCCESS) { 649 *reason = "Cannot make packer zones"; 650 return result; 651 } 652 653 BUG_ON(vdo->device_config->logical_block_size <= 0); 654 BUG_ON(vdo->device_config->owned_device == NULL); 655 result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS, 656 MAXIMUM_VDO_USER_VIOS * 3 / 4, 657 &vdo->data_vio_pool); 658 if (result != VDO_SUCCESS) { 659 *reason = "Cannot allocate data_vio pool"; 660 return result; 661 } 662 663 result = vdo_make_io_submitter(config->thread_counts.bio_threads, 664 config->thread_counts.bio_rotation_interval, 665 get_data_vio_pool_request_limit(vdo->data_vio_pool), 666 vdo, &vdo->io_submitter); 667 if (result != VDO_SUCCESS) { 668 *reason = "bio submission initialization failed"; 669 return result; 670 } 671 672 if (vdo_uses_bio_ack_queue(vdo)) { 673 result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread, 674 &bio_ack_q_type, 675 config->thread_counts.bio_ack_threads, NULL); 676 if (result != VDO_SUCCESS) { 677 *reason = "bio ack queue initialization failed"; 678 return result; 679 } 680 } 681 682 result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type, 683 config->thread_counts.cpu_threads, 684 (void **) vdo->compression_context); 685 if (result != VDO_SUCCESS) { 686 *reason = "CPU queue initialization failed"; 687 return result; 688 } 689 690 return VDO_SUCCESS; 691 } 692 693 static void finish_vdo(struct vdo *vdo) 694 { 695 int i; 696 697 if (vdo->threads == NULL) 698 return; 699 700 vdo_cleanup_io_submitter(vdo->io_submitter); 701 vdo_finish_dedupe_index(vdo->hash_zones); 702 703 for (i = 0; i < vdo->thread_config.thread_count; i++) 704 vdo_finish_work_queue(vdo->threads[i].queue); 705 } 706 707 /** 708 * free_listeners() - Free the list of read-only listeners associated with a thread. 709 * @thread: The thread holding the list to free. 710 */ 711 static void free_listeners(struct vdo_thread *thread) 712 { 713 struct read_only_listener *listener, *next; 714 715 for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) { 716 next = vdo_forget(listener->next); 717 vdo_free(listener); 718 } 719 } 720 721 static void uninitialize_geometry_block(struct vdo_geometry_block *geometry_block) 722 { 723 free_vio_components(&geometry_block->vio); 724 vdo_free(geometry_block->buffer); 725 } 726 727 static void uninitialize_super_block(struct vdo_super_block *super_block) 728 { 729 free_vio_components(&super_block->vio); 730 vdo_free(super_block->buffer); 731 } 732 733 /** 734 * unregister_vdo() - Remove a vdo from the device registry. 735 * @vdo: The vdo to remove. 736 */ 737 static void unregister_vdo(struct vdo *vdo) 738 { 739 write_lock(®istry.lock); 740 if (filter_vdos_locked(vdo_is_equal, vdo) == vdo) 741 list_del_init(&vdo->registration); 742 743 write_unlock(®istry.lock); 744 } 745 746 /** 747 * vdo_destroy() - Destroy a vdo instance. 748 * @vdo: The vdo to destroy (may be NULL). 749 */ 750 void vdo_destroy(struct vdo *vdo) 751 { 752 unsigned int i; 753 754 if (vdo == NULL) 755 return; 756 757 /* A running VDO should never be destroyed without suspending first. */ 758 BUG_ON(vdo_get_admin_state(vdo)->normal); 759 760 vdo->allocations_allowed = true; 761 762 finish_vdo(vdo); 763 unregister_vdo(vdo); 764 free_data_vio_pool(vdo->data_vio_pool); 765 vdo_free_io_submitter(vdo_forget(vdo->io_submitter)); 766 vdo_free_flusher(vdo_forget(vdo->flusher)); 767 vdo_free_packer(vdo_forget(vdo->packer)); 768 vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal)); 769 vdo_free_slab_depot(vdo_forget(vdo->depot)); 770 vdo_uninitialize_layout(&vdo->layout); 771 vdo_uninitialize_layout(&vdo->next_layout); 772 if (vdo->partition_copier) 773 dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier)); 774 uninitialize_geometry_block(&vdo->geometry_block); 775 uninitialize_super_block(&vdo->super_block); 776 vdo_free_block_map(vdo_forget(vdo->block_map)); 777 vdo_free_hash_zones(vdo_forget(vdo->hash_zones)); 778 vdo_free_physical_zones(vdo_forget(vdo->physical_zones)); 779 vdo_free_logical_zones(vdo_forget(vdo->logical_zones)); 780 781 if (vdo->threads != NULL) { 782 for (i = 0; i < vdo->thread_config.thread_count; i++) { 783 free_listeners(&vdo->threads[i]); 784 vdo_free_work_queue(vdo_forget(vdo->threads[i].queue)); 785 } 786 vdo_free(vdo_forget(vdo->threads)); 787 } 788 789 uninitialize_thread_config(&vdo->thread_config); 790 791 if (vdo->compression_context != NULL) { 792 for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++) 793 vdo_free(vdo_forget(vdo->compression_context[i])); 794 795 vdo_free(vdo_forget(vdo->compression_context)); 796 } 797 vdo_free(vdo); 798 } 799 800 /** 801 * finish_reading_super_block() - Continue after loading the super block. 802 * @completion: The super block vio. 803 * 804 * This callback is registered in vdo_load_super_block(). 805 */ 806 static void finish_reading_super_block(struct vdo_completion *completion) 807 { 808 struct vdo_super_block *super_block = 809 container_of(as_vio(completion), struct vdo_super_block, vio); 810 811 vdo_continue_completion(vdo_forget(completion->parent), 812 vdo_decode_super_block(super_block->buffer)); 813 } 814 815 /** 816 * handle_super_block_read_error() - Handle an error reading the super block. 817 * @completion: The super block vio. 818 * 819 * This error handler is registered in vdo_load_super_block(). 820 */ 821 static void handle_super_block_read_error(struct vdo_completion *completion) 822 { 823 vio_record_metadata_io_error(as_vio(completion)); 824 finish_reading_super_block(completion); 825 } 826 827 static void read_super_block_endio(struct bio *bio) 828 { 829 struct vio *vio = bio->bi_private; 830 struct vdo_completion *parent = vio->completion.parent; 831 832 continue_vio_after_io(vio, finish_reading_super_block, 833 parent->callback_thread_id); 834 } 835 836 /** 837 * vdo_load_super_block() - Allocate a super block and read its contents from storage. 838 * @vdo: The vdo containing the super block on disk. 839 * @parent: The completion to notify after loading the super block. 840 */ 841 void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent) 842 { 843 vdo->super_block.vio.completion.parent = parent; 844 vdo_submit_metadata_vio(&vdo->super_block.vio, 845 vdo_get_data_region_start(vdo->geometry), 846 read_super_block_endio, 847 handle_super_block_read_error, 848 REQ_OP_READ); 849 } 850 851 /** 852 * vdo_get_backing_device() - Get the block device object underlying a vdo. 853 * @vdo: The vdo. 854 * 855 * Return: The vdo's current block device. 856 */ 857 struct block_device *vdo_get_backing_device(const struct vdo *vdo) 858 { 859 return vdo->device_config->owned_device->bdev; 860 } 861 862 /** 863 * vdo_get_device_name() - Get the device name associated with the vdo target. 864 * @target: The target device interface. 865 * 866 * Return: The block device name. 867 */ 868 const char *vdo_get_device_name(const struct dm_target *target) 869 { 870 return dm_device_name(dm_table_get_md(target->table)); 871 } 872 873 /** 874 * vdo_synchronous_flush() - Issue a flush request and wait for it to complete. 875 * @vdo: The vdo. 876 * 877 * Return: VDO_SUCCESS or an error. 878 */ 879 int vdo_synchronous_flush(struct vdo *vdo) 880 { 881 int result; 882 struct bio bio; 883 884 bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0, 885 REQ_OP_WRITE | REQ_PREFLUSH); 886 submit_bio_wait(&bio); 887 result = blk_status_to_errno(bio.bi_status); 888 889 atomic64_inc(&vdo->stats.flush_out); 890 if (result != 0) { 891 vdo_log_error_strerror(result, "synchronous flush failed"); 892 result = -EIO; 893 } 894 895 bio_uninit(&bio); 896 return result; 897 } 898 899 /** 900 * vdo_get_state() - Get the current state of the vdo. 901 * @vdo: The vdo. 902 * 903 * Context: This method may be called from any thread. 904 * 905 * Return: The current state of the vdo. 906 */ 907 enum vdo_state vdo_get_state(const struct vdo *vdo) 908 { 909 enum vdo_state state = atomic_read(&vdo->state); 910 911 /* pairs with barriers where state field is changed */ 912 smp_rmb(); 913 return state; 914 } 915 916 /** 917 * vdo_set_state() - Set the current state of the vdo. 918 * @vdo: The vdo whose state is to be set. 919 * @state: The new state of the vdo. 920 * 921 * Context: This method may be called from any thread. 922 */ 923 void vdo_set_state(struct vdo *vdo, enum vdo_state state) 924 { 925 /* pairs with barrier in vdo_get_state */ 926 smp_wmb(); 927 atomic_set(&vdo->state, state); 928 } 929 930 /** 931 * vdo_get_admin_state() - Get the admin state of the vdo. 932 * @vdo: The vdo. 933 * 934 * Return: The code for the vdo's current admin state. 935 */ 936 const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo) 937 { 938 return vdo_get_admin_state_code(&vdo->admin.state); 939 } 940 941 /** 942 * record_vdo() - Record the state of the VDO for encoding in the super block. 943 * @vdo: The vdo. 944 */ 945 static void record_vdo(struct vdo *vdo) 946 { 947 /* This is for backwards compatibility. */ 948 vdo->states.unused = vdo->geometry.unused; 949 vdo->states.vdo.state = vdo_get_state(vdo); 950 vdo->states.block_map = vdo_record_block_map(vdo->block_map); 951 vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal); 952 vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot); 953 vdo->states.layout = vdo->layout; 954 } 955 956 static int __must_check clear_partition(struct vdo *vdo, enum partition_id id) 957 { 958 struct partition *partition; 959 int result; 960 961 result = vdo_get_partition(&vdo->states.layout, id, &partition); 962 if (result != VDO_SUCCESS) 963 return result; 964 965 return blkdev_issue_zeroout(vdo_get_backing_device(vdo), 966 partition->offset * VDO_SECTORS_PER_BLOCK, 967 partition->count * VDO_SECTORS_PER_BLOCK, 968 GFP_NOWAIT, 0); 969 } 970 971 int vdo_clear_layout(struct vdo *vdo) 972 { 973 int result; 974 975 /* Zero out the uds index's first block. */ 976 result = blkdev_issue_zeroout(vdo_get_backing_device(vdo), 977 VDO_SECTORS_PER_BLOCK, 978 VDO_SECTORS_PER_BLOCK, 979 GFP_NOWAIT, 0); 980 if (result != VDO_SUCCESS) 981 return result; 982 983 result = clear_partition(vdo, VDO_BLOCK_MAP_PARTITION); 984 if (result != VDO_SUCCESS) 985 return result; 986 987 return clear_partition(vdo, VDO_RECOVERY_JOURNAL_PARTITION); 988 } 989 990 /** 991 * continue_parent() - Continue the parent of a save operation. 992 * @completion: The completion to continue. 993 * 994 */ 995 static void continue_parent(struct vdo_completion *completion) 996 { 997 vdo_continue_completion(vdo_forget(completion->parent), completion->result); 998 } 999 1000 static void handle_write_endio(struct bio *bio) 1001 { 1002 struct vio *vio = bio->bi_private; 1003 struct vdo_completion *parent = vio->completion.parent; 1004 1005 continue_vio_after_io(vio, continue_parent, 1006 parent->callback_thread_id); 1007 } 1008 1009 /** 1010 * handle_geometry_block_save_error() - Log a geometry block save error. 1011 * @completion: The super block vio. 1012 * 1013 * This error handler is registered in vdo_save_geometry_block(). 1014 */ 1015 static void handle_geometry_block_save_error(struct vdo_completion *completion) 1016 { 1017 struct vdo_geometry_block *geometry_block = 1018 container_of(as_vio(completion), struct vdo_geometry_block, vio); 1019 1020 vio_record_metadata_io_error(&geometry_block->vio); 1021 vdo_log_error_strerror(completion->result, "geometry block save failed"); 1022 completion->callback(completion); 1023 } 1024 1025 /** 1026 * vdo_save_geometry_block() - Encode the vdo and save the geometry block asynchronously. 1027 * @vdo: The vdo whose state is being saved. 1028 * @parent: The completion to notify when the save is complete. 1029 */ 1030 void vdo_save_geometry_block(struct vdo *vdo, struct vdo_completion *parent) 1031 { 1032 struct vdo_geometry_block *geometry_block = &vdo->geometry_block; 1033 1034 vdo_encode_volume_geometry(geometry_block->buffer, &vdo->geometry, 1035 VDO_DEFAULT_GEOMETRY_BLOCK_VERSION); 1036 geometry_block->vio.completion.parent = parent; 1037 geometry_block->vio.completion.callback_thread_id = parent->callback_thread_id; 1038 vdo_submit_metadata_vio(&geometry_block->vio, 1039 VDO_GEOMETRY_BLOCK_LOCATION, 1040 handle_write_endio, handle_geometry_block_save_error, 1041 REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA); 1042 } 1043 1044 /** 1045 * handle_super_block_save_error() - Log a super block save error. 1046 * @completion: The super block vio. 1047 * 1048 * This error handler is registered in vdo_save_components(). 1049 */ 1050 static void handle_super_block_save_error(struct vdo_completion *completion) 1051 { 1052 struct vdo_super_block *super_block = 1053 container_of(as_vio(completion), struct vdo_super_block, vio); 1054 1055 vio_record_metadata_io_error(&super_block->vio); 1056 vdo_log_error_strerror(completion->result, "super block save failed"); 1057 /* 1058 * Mark the super block as unwritable so that we won't attempt to write it again. This 1059 * avoids the case where a growth attempt fails writing the super block with the new size, 1060 * but the subsequent attempt to write out the read-only state succeeds. In this case, 1061 * writes which happened just before the suspend would not be visible if the VDO is 1062 * restarted without rebuilding, but, after a read-only rebuild, the effects of those 1063 * writes would reappear. 1064 */ 1065 super_block->unwritable = true; 1066 completion->callback(completion); 1067 } 1068 1069 /** 1070 * vdo_save_super_block() - Save the component states to the super block asynchronously. 1071 * @vdo: The vdo whose state is being saved. 1072 * @parent: The completion to notify when the save is complete. 1073 */ 1074 void vdo_save_super_block(struct vdo *vdo, struct vdo_completion *parent) 1075 { 1076 struct vdo_super_block *super_block = &vdo->super_block; 1077 1078 vdo_encode_super_block(super_block->buffer, &vdo->states); 1079 super_block->vio.completion.parent = parent; 1080 super_block->vio.completion.callback_thread_id = parent->callback_thread_id; 1081 vdo_submit_metadata_vio(&super_block->vio, 1082 vdo_get_data_region_start(vdo->geometry), 1083 handle_write_endio, handle_super_block_save_error, 1084 REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA); 1085 } 1086 1087 /** 1088 * vdo_save_components() - Copy the current state of the VDO to the states struct and save 1089 * it to the super block asynchronously. 1090 * @vdo: The vdo whose state is being saved. 1091 * @parent: The completion to notify when the save is complete. 1092 */ 1093 void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent) 1094 { 1095 struct vdo_super_block *super_block = &vdo->super_block; 1096 1097 if (super_block->unwritable) { 1098 vdo_continue_completion(parent, VDO_READ_ONLY); 1099 return; 1100 } 1101 1102 if (super_block->vio.completion.parent != NULL) { 1103 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 1104 return; 1105 } 1106 1107 record_vdo(vdo); 1108 vdo_save_super_block(vdo, parent); 1109 } 1110 1111 /** 1112 * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes 1113 * read-only. 1114 * @vdo: The vdo to register with. 1115 * @listener: The object to notify. 1116 * @notification: The function to call to send the notification. 1117 * @thread_id: The id of the thread on which to send the notification. 1118 * 1119 * Return: VDO_SUCCESS or an error. 1120 */ 1121 int vdo_register_read_only_listener(struct vdo *vdo, void *listener, 1122 vdo_read_only_notification_fn notification, 1123 thread_id_t thread_id) 1124 { 1125 struct vdo_thread *thread = &vdo->threads[thread_id]; 1126 struct read_only_listener *read_only_listener; 1127 int result; 1128 1129 result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread, 1130 "read only listener not registered on dedupe thread"); 1131 if (result != VDO_SUCCESS) 1132 return result; 1133 1134 result = vdo_allocate(1, __func__, &read_only_listener); 1135 if (result != VDO_SUCCESS) 1136 return result; 1137 1138 *read_only_listener = (struct read_only_listener) { 1139 .listener = listener, 1140 .notify = notification, 1141 .next = thread->listeners, 1142 }; 1143 1144 thread->listeners = read_only_listener; 1145 return VDO_SUCCESS; 1146 } 1147 1148 /** 1149 * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only. 1150 * @listener: The vdo. 1151 * @parent: The completion to notify in order to acknowledge the notification. 1152 * 1153 * This will save the read-only state to the super block. 1154 * 1155 * Implements vdo_read_only_notification_fn. 1156 */ 1157 static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent) 1158 { 1159 struct vdo *vdo = listener; 1160 1161 if (vdo_in_read_only_mode(vdo)) 1162 vdo_finish_completion(parent); 1163 1164 vdo_set_state(vdo, VDO_READ_ONLY_MODE); 1165 vdo_save_components(vdo, parent); 1166 } 1167 1168 /** 1169 * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors. 1170 * @vdo: The vdo to enable. 1171 * 1172 * Return: VDO_SUCCESS or an error. 1173 */ 1174 int vdo_enable_read_only_entry(struct vdo *vdo) 1175 { 1176 thread_id_t id; 1177 bool is_read_only = vdo_in_read_only_mode(vdo); 1178 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1179 1180 if (is_read_only) { 1181 notifier->read_only_error = VDO_READ_ONLY; 1182 notifier->state = NOTIFIED; 1183 } else { 1184 notifier->state = MAY_NOT_NOTIFY; 1185 } 1186 1187 spin_lock_init(¬ifier->lock); 1188 vdo_initialize_completion(¬ifier->completion, vdo, 1189 VDO_READ_ONLY_MODE_COMPLETION); 1190 1191 for (id = 0; id < vdo->thread_config.thread_count; id++) 1192 vdo->threads[id].is_read_only = is_read_only; 1193 1194 return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode, 1195 vdo->thread_config.admin_thread); 1196 } 1197 1198 /** 1199 * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in 1200 * progress and prevent any subsequent 1201 * notifications. 1202 * @parent: The completion to notify when no threads are entering read-only mode. 1203 * 1204 * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry(). 1205 */ 1206 void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent) 1207 { 1208 struct vdo *vdo = parent->vdo; 1209 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1210 1211 vdo_assert_on_admin_thread(vdo, __func__); 1212 1213 if (notifier->waiter != NULL) { 1214 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 1215 return; 1216 } 1217 1218 spin_lock(¬ifier->lock); 1219 if (notifier->state == NOTIFYING) 1220 notifier->waiter = parent; 1221 else if (notifier->state == MAY_NOTIFY) 1222 notifier->state = MAY_NOT_NOTIFY; 1223 spin_unlock(¬ifier->lock); 1224 1225 if (notifier->waiter == NULL) { 1226 /* 1227 * A notification was not in progress, and now they are 1228 * disallowed. 1229 */ 1230 vdo_launch_completion(parent); 1231 return; 1232 } 1233 } 1234 1235 /** 1236 * as_notifier() - Convert a generic vdo_completion to a read_only_notifier. 1237 * @completion: The completion to convert. 1238 * 1239 * Return: The completion as a read_only_notifier. 1240 */ 1241 static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion) 1242 { 1243 vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION); 1244 return container_of(completion, struct read_only_notifier, completion); 1245 } 1246 1247 /** 1248 * finish_entering_read_only_mode() - Complete the process of entering read only mode. 1249 * @completion: The read-only mode completion. 1250 */ 1251 static void finish_entering_read_only_mode(struct vdo_completion *completion) 1252 { 1253 struct read_only_notifier *notifier = as_notifier(completion); 1254 1255 vdo_assert_on_admin_thread(completion->vdo, __func__); 1256 1257 spin_lock(¬ifier->lock); 1258 notifier->state = NOTIFIED; 1259 spin_unlock(¬ifier->lock); 1260 1261 if (notifier->waiter != NULL) 1262 vdo_continue_completion(vdo_forget(notifier->waiter), 1263 completion->result); 1264 } 1265 1266 /** 1267 * make_thread_read_only() - Inform each thread that the VDO is in read-only mode. 1268 * @completion: The read-only mode completion. 1269 */ 1270 static void make_thread_read_only(struct vdo_completion *completion) 1271 { 1272 struct vdo *vdo = completion->vdo; 1273 thread_id_t thread_id = completion->callback_thread_id; 1274 struct read_only_notifier *notifier = as_notifier(completion); 1275 struct read_only_listener *listener = completion->parent; 1276 1277 if (listener == NULL) { 1278 /* This is the first call on this thread */ 1279 struct vdo_thread *thread = &vdo->threads[thread_id]; 1280 1281 thread->is_read_only = true; 1282 listener = thread->listeners; 1283 if (thread_id == 0) 1284 vdo_log_error_strerror(READ_ONCE(notifier->read_only_error), 1285 "Unrecoverable error, entering read-only mode"); 1286 } else { 1287 /* We've just finished notifying a listener */ 1288 listener = listener->next; 1289 } 1290 1291 if (listener != NULL) { 1292 /* We have a listener to notify */ 1293 vdo_prepare_completion(completion, make_thread_read_only, 1294 make_thread_read_only, thread_id, 1295 listener); 1296 listener->notify(listener->listener, completion); 1297 return; 1298 } 1299 1300 /* We're done with this thread */ 1301 if (++thread_id == vdo->thread_config.dedupe_thread) { 1302 /* 1303 * We don't want to notify the dedupe thread since it may be 1304 * blocked rebuilding the index. 1305 */ 1306 thread_id++; 1307 } 1308 1309 if (thread_id >= vdo->thread_config.thread_count) { 1310 /* There are no more threads */ 1311 vdo_prepare_completion(completion, finish_entering_read_only_mode, 1312 finish_entering_read_only_mode, 1313 vdo->thread_config.admin_thread, NULL); 1314 } else { 1315 vdo_prepare_completion(completion, make_thread_read_only, 1316 make_thread_read_only, thread_id, NULL); 1317 } 1318 1319 vdo_launch_completion(completion); 1320 } 1321 1322 /** 1323 * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode, 1324 * reversing the effects of 1325 * vdo_wait_until_not_entering_read_only_mode(). 1326 * @parent: The object to notify once the operation is complete. 1327 * 1328 * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it 1329 * will be done when this method is called. If that happens, the parent will not be notified until 1330 * the vdo has actually entered read-only mode and attempted to save the super block. 1331 * 1332 * Context: This method may only be called from the admin thread. 1333 */ 1334 void vdo_allow_read_only_mode_entry(struct vdo_completion *parent) 1335 { 1336 struct vdo *vdo = parent->vdo; 1337 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1338 1339 vdo_assert_on_admin_thread(vdo, __func__); 1340 1341 if (notifier->waiter != NULL) { 1342 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 1343 return; 1344 } 1345 1346 spin_lock(¬ifier->lock); 1347 if (notifier->state == MAY_NOT_NOTIFY) { 1348 if (notifier->read_only_error == VDO_SUCCESS) { 1349 notifier->state = MAY_NOTIFY; 1350 } else { 1351 notifier->state = NOTIFYING; 1352 notifier->waiter = parent; 1353 } 1354 } 1355 spin_unlock(¬ifier->lock); 1356 1357 if (notifier->waiter == NULL) { 1358 /* We're done */ 1359 vdo_launch_completion(parent); 1360 return; 1361 } 1362 1363 /* Do the pending notification. */ 1364 make_thread_read_only(¬ifier->completion); 1365 } 1366 1367 /** 1368 * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the 1369 * super block. 1370 * @vdo: The vdo. 1371 * @error_code: The error which caused the VDO to enter read-only mode. 1372 * 1373 * This method is a no-op if the VDO is already read-only. 1374 */ 1375 void vdo_enter_read_only_mode(struct vdo *vdo, int error_code) 1376 { 1377 bool notify = false; 1378 thread_id_t thread_id = vdo_get_callback_thread_id(); 1379 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1380 struct vdo_thread *thread; 1381 1382 if (thread_id != VDO_INVALID_THREAD_ID) { 1383 thread = &vdo->threads[thread_id]; 1384 if (thread->is_read_only) { 1385 /* This thread has already gone read-only. */ 1386 return; 1387 } 1388 1389 /* Record for this thread that the VDO is read-only. */ 1390 thread->is_read_only = true; 1391 } 1392 1393 spin_lock(¬ifier->lock); 1394 if (notifier->read_only_error == VDO_SUCCESS) { 1395 WRITE_ONCE(notifier->read_only_error, error_code); 1396 if (notifier->state == MAY_NOTIFY) { 1397 notifier->state = NOTIFYING; 1398 notify = true; 1399 } 1400 } 1401 spin_unlock(¬ifier->lock); 1402 1403 if (!notify) { 1404 /* The notifier is already aware of a read-only error */ 1405 return; 1406 } 1407 1408 /* Initiate a notification starting on the lowest numbered thread. */ 1409 vdo_launch_completion_callback(¬ifier->completion, make_thread_read_only, 0); 1410 } 1411 1412 /** 1413 * vdo_is_read_only() - Check whether the VDO is read-only. 1414 * @vdo: The vdo. 1415 * 1416 * Return: True if the vdo is read-only. 1417 * 1418 * This method may be called from any thread, as opposed to examining the VDO's state field which 1419 * is only safe to check from the admin thread. 1420 */ 1421 bool vdo_is_read_only(struct vdo *vdo) 1422 { 1423 return vdo->threads[vdo_get_callback_thread_id()].is_read_only; 1424 } 1425 1426 /** 1427 * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode. 1428 * @vdo: The vdo to query. 1429 * 1430 * Return: True if the vdo is in read-only mode. 1431 */ 1432 bool vdo_in_read_only_mode(const struct vdo *vdo) 1433 { 1434 return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE); 1435 } 1436 1437 /** 1438 * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode. 1439 * @vdo: The vdo to query. 1440 * 1441 * Return: True if the vdo is in recovery mode. 1442 */ 1443 bool vdo_in_recovery_mode(const struct vdo *vdo) 1444 { 1445 return (vdo_get_state(vdo) == VDO_RECOVERING); 1446 } 1447 1448 /** 1449 * vdo_enter_recovery_mode() - Put the vdo into recovery mode. 1450 * @vdo: The vdo. 1451 */ 1452 void vdo_enter_recovery_mode(struct vdo *vdo) 1453 { 1454 vdo_assert_on_admin_thread(vdo, __func__); 1455 1456 if (vdo_in_read_only_mode(vdo)) 1457 return; 1458 1459 vdo_log_info("Entering recovery mode"); 1460 vdo_set_state(vdo, VDO_RECOVERING); 1461 } 1462 1463 /** 1464 * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete. 1465 * @completion: The sync completion. 1466 */ 1467 static void complete_synchronous_action(struct vdo_completion *completion) 1468 { 1469 vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION); 1470 complete(&(container_of(completion, struct sync_completion, 1471 vdo_completion)->completion)); 1472 } 1473 1474 /** 1475 * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete. 1476 * @vdo: The vdo. 1477 * @action: The callback to launch. 1478 * @thread_id: The thread on which to run the action. 1479 * @parent: The parent of the sync completion (may be NULL). 1480 */ 1481 static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action, 1482 thread_id_t thread_id, void *parent) 1483 { 1484 struct sync_completion sync; 1485 1486 vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION); 1487 init_completion(&sync.completion); 1488 sync.vdo_completion.parent = parent; 1489 vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id); 1490 wait_for_completion(&sync.completion); 1491 return sync.vdo_completion.result; 1492 } 1493 1494 /** 1495 * set_compression_callback() - Callback to turn compression on or off. 1496 * @completion: The completion. 1497 */ 1498 static void set_compression_callback(struct vdo_completion *completion) 1499 { 1500 struct vdo *vdo = completion->vdo; 1501 bool *enable = completion->parent; 1502 bool was_enabled = vdo_get_compressing(vdo); 1503 1504 if (*enable != was_enabled) { 1505 WRITE_ONCE(vdo->compressing, *enable); 1506 if (was_enabled) { 1507 /* Signal the packer to flush since compression has been disabled. */ 1508 vdo_flush_packer(vdo->packer); 1509 } 1510 } 1511 1512 vdo_log_info("compression is %s", (*enable ? "enabled" : "disabled")); 1513 *enable = was_enabled; 1514 complete_synchronous_action(completion); 1515 } 1516 1517 /** 1518 * vdo_set_compressing() - Turn compression on or off. 1519 * @vdo: The vdo. 1520 * @enable: Whether to enable or disable compression. 1521 * 1522 * Return: Whether compression was previously on or off. 1523 */ 1524 bool vdo_set_compressing(struct vdo *vdo, bool enable) 1525 { 1526 perform_synchronous_action(vdo, set_compression_callback, 1527 vdo->thread_config.packer_thread, 1528 &enable); 1529 return enable; 1530 } 1531 1532 /** 1533 * vdo_get_compressing() - Get whether compression is enabled in a vdo. 1534 * @vdo: The vdo. 1535 * 1536 * Return: State of compression. 1537 */ 1538 bool vdo_get_compressing(struct vdo *vdo) 1539 { 1540 return READ_ONCE(vdo->compressing); 1541 } 1542 1543 static size_t get_block_map_cache_size(const struct vdo *vdo) 1544 { 1545 return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE; 1546 } 1547 1548 static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo) 1549 { 1550 /* 1551 * The error counts can be incremented from arbitrary threads and so must be incremented 1552 * atomically, but they are just statistics with no semantics that could rely on memory 1553 * order, so unfenced reads are sufficient. 1554 */ 1555 const struct atomic_statistics *atoms = &vdo->stats; 1556 1557 return (struct error_statistics) { 1558 .invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count), 1559 .no_space_error_count = atomic64_read(&atoms->no_space_error_count), 1560 .read_only_error_count = atomic64_read(&atoms->read_only_error_count), 1561 }; 1562 } 1563 1564 static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a) 1565 { 1566 b->read = atomic64_read(&a->read); 1567 b->write = atomic64_read(&a->write); 1568 b->discard = atomic64_read(&a->discard); 1569 b->flush = atomic64_read(&a->flush); 1570 b->empty_flush = atomic64_read(&a->empty_flush); 1571 b->fua = atomic64_read(&a->fua); 1572 } 1573 1574 static struct bio_stats subtract_bio_stats(struct bio_stats minuend, 1575 struct bio_stats subtrahend) 1576 { 1577 return (struct bio_stats) { 1578 .read = minuend.read - subtrahend.read, 1579 .write = minuend.write - subtrahend.write, 1580 .discard = minuend.discard - subtrahend.discard, 1581 .flush = minuend.flush - subtrahend.flush, 1582 .empty_flush = minuend.empty_flush - subtrahend.empty_flush, 1583 .fua = minuend.fua - subtrahend.fua, 1584 }; 1585 } 1586 1587 /** 1588 * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data. 1589 * @vdo: The vdo. 1590 * 1591 * Return: The number of blocks allocated for user data. 1592 */ 1593 static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo) 1594 { 1595 return (vdo_get_slab_depot_allocated_blocks(vdo->depot) - 1596 vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal)); 1597 } 1598 1599 /** 1600 * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata. 1601 * @vdo: The vdo. 1602 * 1603 * Return: The number of overhead blocks. 1604 */ 1605 static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo) 1606 { 1607 /* 1608 * config.physical_blocks is mutated during resize and is in a packed structure, 1609 * but resize runs on admin thread. 1610 * TODO: Verify that this is always safe. 1611 */ 1612 return (vdo->states.vdo.config.physical_blocks - 1613 vdo_get_slab_depot_data_blocks(vdo->depot) + 1614 vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal)); 1615 } 1616 1617 static const char *vdo_describe_state(enum vdo_state state) 1618 { 1619 /* These strings should all fit in the 15 chars of VDOStatistics.mode. */ 1620 switch (state) { 1621 case VDO_RECOVERING: 1622 return "recovering"; 1623 1624 case VDO_READ_ONLY_MODE: 1625 return "read-only"; 1626 1627 default: 1628 return "normal"; 1629 } 1630 } 1631 1632 /** 1633 * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread. 1634 * @vdo: The vdo. 1635 * @stats: The statistics structure to populate. 1636 */ 1637 static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats) 1638 { 1639 struct recovery_journal *journal = vdo->recovery_journal; 1640 enum vdo_state state = vdo_get_state(vdo); 1641 1642 vdo_assert_on_admin_thread(vdo, __func__); 1643 1644 /* start with a clean slate */ 1645 memset(stats, 0, sizeof(struct vdo_statistics)); 1646 1647 /* 1648 * These are immutable properties of the vdo object, so it is safe to query them from any 1649 * thread. 1650 */ 1651 stats->version = STATISTICS_VERSION; 1652 stats->logical_blocks = vdo->states.vdo.config.logical_blocks; 1653 /* 1654 * config.physical_blocks is mutated during resize and is in a packed structure, but resize 1655 * runs on the admin thread. 1656 * TODO: verify that this is always safe 1657 */ 1658 stats->physical_blocks = vdo->states.vdo.config.physical_blocks; 1659 stats->block_size = VDO_BLOCK_SIZE; 1660 stats->complete_recoveries = vdo->states.vdo.complete_recoveries; 1661 stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries; 1662 stats->block_map_cache_size = get_block_map_cache_size(vdo); 1663 1664 /* The callees are responsible for thread-safety. */ 1665 stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo); 1666 stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo); 1667 stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal); 1668 vdo_get_slab_depot_statistics(vdo->depot, stats); 1669 stats->journal = vdo_get_recovery_journal_statistics(journal); 1670 stats->packer = vdo_get_packer_statistics(vdo->packer); 1671 stats->block_map = vdo_get_block_map_statistics(vdo->block_map); 1672 vdo_get_dedupe_statistics(vdo->hash_zones, stats); 1673 stats->errors = get_vdo_error_statistics(vdo); 1674 stats->in_recovery_mode = (state == VDO_RECOVERING); 1675 snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state)); 1676 1677 stats->instance = vdo->instance; 1678 stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool); 1679 stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool); 1680 1681 stats->flush_out = atomic64_read(&vdo->stats.flush_out); 1682 stats->logical_block_size = vdo->device_config->logical_block_size; 1683 copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in); 1684 copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial); 1685 copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out); 1686 copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta); 1687 copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal); 1688 copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache); 1689 copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed); 1690 copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed); 1691 copy_bio_stat(&stats->bios_journal_completed, 1692 &vdo->stats.bios_journal_completed); 1693 copy_bio_stat(&stats->bios_page_cache_completed, 1694 &vdo->stats.bios_page_cache_completed); 1695 copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged); 1696 copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial); 1697 stats->bios_in_progress = 1698 subtract_bio_stats(stats->bios_in, stats->bios_acknowledged); 1699 vdo_get_memory_stats(&stats->memory_usage.bytes_used, 1700 &stats->memory_usage.peak_bytes_used); 1701 } 1702 1703 /** 1704 * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics 1705 * structure on the admin thread. 1706 * @completion: The completion. 1707 * 1708 * This callback is registered in vdo_fetch_statistics(). 1709 */ 1710 static void vdo_fetch_statistics_callback(struct vdo_completion *completion) 1711 { 1712 get_vdo_statistics(completion->vdo, completion->parent); 1713 complete_synchronous_action(completion); 1714 } 1715 1716 /** 1717 * vdo_fetch_statistics() - Fetch statistics on the correct thread. 1718 * @vdo: The vdo. 1719 * @stats: The vdo statistics are returned here. 1720 */ 1721 void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats) 1722 { 1723 perform_synchronous_action(vdo, vdo_fetch_statistics_callback, 1724 vdo->thread_config.admin_thread, stats); 1725 } 1726 1727 /** 1728 * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is 1729 * currently running. 1730 * 1731 * Return: The current thread ID, or -1 if no such thread. 1732 */ 1733 thread_id_t vdo_get_callback_thread_id(void) 1734 { 1735 struct vdo_work_queue *queue = vdo_get_current_work_queue(); 1736 struct vdo_thread *thread; 1737 thread_id_t thread_id; 1738 1739 if (queue == NULL) 1740 return VDO_INVALID_THREAD_ID; 1741 1742 thread = vdo_get_work_queue_owner(queue); 1743 thread_id = thread->thread_id; 1744 1745 if (PARANOID_THREAD_CONSISTENCY_CHECKS) { 1746 BUG_ON(thread_id >= thread->vdo->thread_config.thread_count); 1747 BUG_ON(thread != &thread->vdo->threads[thread_id]); 1748 } 1749 1750 return thread_id; 1751 } 1752 1753 /** 1754 * vdo_dump_status() - Dump status information about a vdo to the log for debugging. 1755 * @vdo: The vdo to dump. 1756 */ 1757 void vdo_dump_status(const struct vdo *vdo) 1758 { 1759 zone_count_t zone; 1760 1761 vdo_dump_flusher(vdo->flusher); 1762 vdo_dump_recovery_journal_statistics(vdo->recovery_journal); 1763 vdo_dump_packer(vdo->packer); 1764 vdo_dump_slab_depot(vdo->depot); 1765 1766 for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++) 1767 vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]); 1768 1769 for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++) 1770 vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]); 1771 1772 vdo_dump_hash_zones(vdo->hash_zones); 1773 } 1774 1775 /** 1776 * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread. 1777 * @vdo: The vdo. 1778 * @name: The name of the function which should be running on the admin thread (for logging). 1779 */ 1780 void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name) 1781 { 1782 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread), 1783 "%s called on admin thread", name); 1784 } 1785 1786 /** 1787 * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified 1788 * logical zone thread. 1789 * @vdo: The vdo. 1790 * @logical_zone: The number of the logical zone. 1791 * @name: The name of the calling function. 1792 */ 1793 void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone, 1794 const char *name) 1795 { 1796 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == 1797 vdo->thread_config.logical_threads[logical_zone]), 1798 "%s called on logical thread", name); 1799 } 1800 1801 /** 1802 * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified 1803 * physical zone thread. 1804 * @vdo: The vdo. 1805 * @physical_zone: The number of the physical zone. 1806 * @name: The name of the calling function. 1807 */ 1808 void vdo_assert_on_physical_zone_thread(const struct vdo *vdo, 1809 zone_count_t physical_zone, const char *name) 1810 { 1811 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == 1812 vdo->thread_config.physical_threads[physical_zone]), 1813 "%s called on physical thread", name); 1814 } 1815 1816 /** 1817 * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number. 1818 * @vdo: The vdo containing the physical zones. 1819 * @pbn: The PBN of the data block. 1820 * @zone_ptr: A pointer to return the physical zone. 1821 * 1822 * Gets the physical zone responsible for a given physical block number of a data block in this vdo 1823 * instance, or of the zero block (for which a NULL zone is returned). For any other block number 1824 * that is not in the range of valid data block numbers in any slab, an error will be returned. 1825 * This function is safe to call on invalid block numbers; it will not put the vdo into read-only 1826 * mode. 1827 * 1828 * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any 1829 * other failure. 1830 */ 1831 int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn, 1832 struct physical_zone **zone_ptr) 1833 { 1834 struct vdo_slab *slab; 1835 int result; 1836 1837 if (pbn == VDO_ZERO_BLOCK) { 1838 *zone_ptr = NULL; 1839 return VDO_SUCCESS; 1840 } 1841 1842 /* 1843 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first 1844 * because it won't trigger read-only mode on an invalid PBN. 1845 */ 1846 if (!vdo_is_physical_data_block(vdo->depot, pbn)) 1847 return VDO_OUT_OF_RANGE; 1848 1849 /* With the PBN already checked, we should always succeed in finding a slab. */ 1850 slab = vdo_get_slab(vdo->depot, pbn); 1851 result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs"); 1852 if (result != VDO_SUCCESS) 1853 return result; 1854 1855 *zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number]; 1856 return VDO_SUCCESS; 1857 } 1858