1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 7 #include "index.h" 8 9 #include "logger.h" 10 #include "memory-alloc.h" 11 12 #include "funnel-requestqueue.h" 13 #include "hash-utils.h" 14 #include "sparse-cache.h" 15 16 static const u64 NO_LAST_SAVE = U64_MAX; 17 18 /* 19 * When searching for deduplication records, the index first searches the volume index, and then 20 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to 21 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been 22 * committed (either the open chapter or a recently closed one), the index searches the in-memory 23 * representation of the chapter. Finally, if the volume index does not find a record and the index 24 * is sparse, the index will search the sparse cache. 25 * 26 * The index send two kinds of messages to coordinate between zones: chapter close messages for the 27 * chapter writer, and sparse cache barrier messages for the sparse cache. 28 * 29 * The chapter writer is responsible for committing chapters of records to storage. Since zones can 30 * get different numbers of records, some zones may fall behind others. Each time a zone fills up 31 * its available space in a chapter, it informs the chapter writer that the chapter is complete, 32 * and also informs all other zones that it has closed the chapter. Each other zone will then close 33 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones. 34 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage. 35 * 36 * The last zone to close the chapter also removes the oldest chapter from the volume index. 37 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter 38 * means that those zones will never ask the volume index about it. No zone is allowed to get more 39 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another 40 * chapter before the previous one has been closed by all zones, it is forced to wait. 41 * 42 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a 43 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone 44 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and 45 * paused its operations, the cache membership is changed and each zone is then informed that it 46 * can proceed. More details can be found in the sparse cache documentation. 47 * 48 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the 49 * barrier message to change the sparse cache membership, so the index simulates the message by 50 * invoking the handler directly. 51 */ 52 53 struct chapter_writer { 54 /* The index to which we belong */ 55 struct uds_index *index; 56 /* The thread to do the writing */ 57 struct thread *thread; 58 /* The lock protecting the following fields */ 59 struct mutex mutex; 60 /* The condition signalled on state changes */ 61 struct cond_var cond; 62 /* Set to true to stop the thread */ 63 bool stop; 64 /* The result from the most recent write */ 65 int result; 66 /* The number of bytes allocated by the chapter writer */ 67 size_t memory_size; 68 /* The number of zones which have submitted a chapter for writing */ 69 unsigned int zones_to_write; 70 /* Open chapter index used by uds_close_open_chapter() */ 71 struct open_chapter_index *open_chapter_index; 72 /* Collated records used by uds_close_open_chapter() */ 73 struct uds_volume_record *collated_records; 74 /* The chapters to write (one per zone) */ 75 struct open_chapter_zone *chapters[]; 76 }; 77 78 static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter) 79 { 80 return uds_is_chapter_sparse(zone->index->volume->geometry, 81 zone->oldest_virtual_chapter, 82 zone->newest_virtual_chapter, virtual_chapter); 83 } 84 85 static int launch_zone_message(struct uds_zone_message message, unsigned int zone, 86 struct uds_index *index) 87 { 88 int result; 89 struct uds_request *request; 90 91 result = vdo_allocate(1, struct uds_request, __func__, &request); 92 if (result != VDO_SUCCESS) 93 return result; 94 95 request->index = index; 96 request->unbatched = true; 97 request->zone_number = zone; 98 request->zone_message = message; 99 100 uds_enqueue_request(request, STAGE_MESSAGE); 101 return UDS_SUCCESS; 102 } 103 104 static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter) 105 { 106 struct uds_zone_message message = { 107 .type = UDS_MESSAGE_SPARSE_CACHE_BARRIER, 108 .virtual_chapter = virtual_chapter, 109 }; 110 unsigned int zone; 111 112 for (zone = 0; zone < index->zone_count; zone++) { 113 int result = launch_zone_message(message, zone, index); 114 115 VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation"); 116 } 117 } 118 119 /* 120 * Determine whether this request should trigger a sparse cache barrier message to change the 121 * membership of the sparse cache. If a change in membership is desired, the function returns the 122 * chapter number to add. 123 */ 124 static u64 triage_index_request(struct uds_index *index, struct uds_request *request) 125 { 126 u64 virtual_chapter; 127 struct index_zone *zone; 128 129 virtual_chapter = uds_lookup_volume_index_name(index->volume_index, 130 &request->record_name); 131 if (virtual_chapter == NO_CHAPTER) 132 return NO_CHAPTER; 133 134 zone = index->zones[request->zone_number]; 135 if (!is_zone_chapter_sparse(zone, virtual_chapter)) 136 return NO_CHAPTER; 137 138 /* 139 * FIXME: Optimize for a common case by remembering the chapter from the most recent 140 * barrier message and skipping this chapter if is it the same. 141 */ 142 143 return virtual_chapter; 144 } 145 146 /* 147 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This 148 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind 149 * of index does nothing here. 150 */ 151 static int simulate_index_zone_barrier_message(struct index_zone *zone, 152 struct uds_request *request) 153 { 154 u64 sparse_virtual_chapter; 155 156 if ((zone->index->zone_count > 1) || 157 !uds_is_sparse_index_geometry(zone->index->volume->geometry)) 158 return UDS_SUCCESS; 159 160 sparse_virtual_chapter = triage_index_request(zone->index, request); 161 if (sparse_virtual_chapter == NO_CHAPTER) 162 return UDS_SUCCESS; 163 164 return uds_update_sparse_cache(zone, sparse_virtual_chapter); 165 } 166 167 /* This is the request processing function for the triage queue. */ 168 static void triage_request(struct uds_request *request) 169 { 170 struct uds_index *index = request->index; 171 u64 sparse_virtual_chapter = triage_index_request(index, request); 172 173 if (sparse_virtual_chapter != NO_CHAPTER) 174 enqueue_barrier_messages(index, sparse_virtual_chapter); 175 176 uds_enqueue_request(request, STAGE_INDEX); 177 } 178 179 static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number) 180 { 181 int result; 182 struct chapter_writer *writer = index->chapter_writer; 183 184 mutex_lock(&writer->mutex); 185 while (index->newest_virtual_chapter < current_chapter_number) 186 uds_wait_cond(&writer->cond, &writer->mutex); 187 result = writer->result; 188 mutex_unlock(&writer->mutex); 189 190 if (result != UDS_SUCCESS) 191 return vdo_log_error_strerror(result, 192 "Writing of previous open chapter failed"); 193 194 return UDS_SUCCESS; 195 } 196 197 static int swap_open_chapter(struct index_zone *zone) 198 { 199 int result; 200 struct open_chapter_zone *temporary_chapter; 201 202 result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter); 203 if (result != UDS_SUCCESS) 204 return result; 205 206 temporary_chapter = zone->open_chapter; 207 zone->open_chapter = zone->writing_chapter; 208 zone->writing_chapter = temporary_chapter; 209 return UDS_SUCCESS; 210 } 211 212 /* 213 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start 214 * writing until all zones have closed it. 215 */ 216 static unsigned int start_closing_chapter(struct uds_index *index, 217 unsigned int zone_number, 218 struct open_chapter_zone *chapter) 219 { 220 unsigned int finished_zones; 221 struct chapter_writer *writer = index->chapter_writer; 222 223 mutex_lock(&writer->mutex); 224 finished_zones = ++writer->zones_to_write; 225 writer->chapters[zone_number] = chapter; 226 uds_broadcast_cond(&writer->cond); 227 mutex_unlock(&writer->mutex); 228 229 return finished_zones; 230 } 231 232 static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter) 233 { 234 int result; 235 unsigned int i; 236 struct uds_zone_message zone_message = { 237 .type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED, 238 .virtual_chapter = closed_chapter, 239 }; 240 241 for (i = 0; i < zone->index->zone_count; i++) { 242 if (zone->id == i) 243 continue; 244 245 result = launch_zone_message(zone_message, i, zone->index); 246 if (result != UDS_SUCCESS) 247 return result; 248 } 249 250 return UDS_SUCCESS; 251 } 252 253 static int open_next_chapter(struct index_zone *zone) 254 { 255 int result; 256 u64 closed_chapter; 257 u64 expiring; 258 unsigned int finished_zones; 259 u32 expire_chapters; 260 261 vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)", 262 (unsigned long long) zone->newest_virtual_chapter, zone->id, 263 zone->open_chapter->size, 264 zone->open_chapter->capacity - zone->open_chapter->size); 265 266 result = swap_open_chapter(zone); 267 if (result != UDS_SUCCESS) 268 return result; 269 270 closed_chapter = zone->newest_virtual_chapter++; 271 uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id, 272 zone->newest_virtual_chapter); 273 uds_reset_open_chapter(zone->open_chapter); 274 275 finished_zones = start_closing_chapter(zone->index, zone->id, 276 zone->writing_chapter); 277 if ((finished_zones == 1) && (zone->index->zone_count > 1)) { 278 result = announce_chapter_closed(zone, closed_chapter); 279 if (result != UDS_SUCCESS) 280 return result; 281 } 282 283 expiring = zone->oldest_virtual_chapter; 284 expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry, 285 zone->newest_virtual_chapter); 286 zone->oldest_virtual_chapter += expire_chapters; 287 288 if (finished_zones < zone->index->zone_count) 289 return UDS_SUCCESS; 290 291 while (expire_chapters-- > 0) 292 uds_forget_chapter(zone->index->volume, expiring++); 293 294 return UDS_SUCCESS; 295 } 296 297 static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter) 298 { 299 if (zone->newest_virtual_chapter == virtual_chapter) 300 return open_next_chapter(zone); 301 302 return UDS_SUCCESS; 303 } 304 305 static int dispatch_index_zone_control_request(struct uds_request *request) 306 { 307 struct uds_zone_message *message = &request->zone_message; 308 struct index_zone *zone = request->index->zones[request->zone_number]; 309 310 switch (message->type) { 311 case UDS_MESSAGE_SPARSE_CACHE_BARRIER: 312 return uds_update_sparse_cache(zone, message->virtual_chapter); 313 314 case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED: 315 return handle_chapter_closed(zone, message->virtual_chapter); 316 317 default: 318 vdo_log_error("invalid message type: %d", message->type); 319 return UDS_INVALID_ARGUMENT; 320 } 321 } 322 323 static void set_request_location(struct uds_request *request, 324 enum uds_index_region new_location) 325 { 326 request->location = new_location; 327 request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) || 328 (new_location == UDS_LOCATION_IN_DENSE) || 329 (new_location == UDS_LOCATION_IN_SPARSE)); 330 } 331 332 static void set_chapter_location(struct uds_request *request, 333 const struct index_zone *zone, u64 virtual_chapter) 334 { 335 request->found = true; 336 if (virtual_chapter == zone->newest_virtual_chapter) 337 request->location = UDS_LOCATION_IN_OPEN_CHAPTER; 338 else if (is_zone_chapter_sparse(zone, virtual_chapter)) 339 request->location = UDS_LOCATION_IN_SPARSE; 340 else 341 request->location = UDS_LOCATION_IN_DENSE; 342 } 343 344 static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request, 345 u64 virtual_chapter, bool *found) 346 { 347 int result; 348 struct volume *volume; 349 u16 record_page_number; 350 u32 chapter; 351 352 result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter, 353 &record_page_number); 354 if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER)) 355 return result; 356 357 request->virtual_chapter = virtual_chapter; 358 volume = zone->index->volume; 359 chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter); 360 return uds_search_cached_record_page(volume, request, chapter, 361 record_page_number, found); 362 } 363 364 static int get_record_from_zone(struct index_zone *zone, struct uds_request *request, 365 bool *found) 366 { 367 struct volume *volume; 368 369 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) { 370 *found = true; 371 return UDS_SUCCESS; 372 } else if (request->location == UDS_LOCATION_UNAVAILABLE) { 373 *found = false; 374 return UDS_SUCCESS; 375 } 376 377 if (request->virtual_chapter == zone->newest_virtual_chapter) { 378 uds_search_open_chapter(zone->open_chapter, &request->record_name, 379 &request->old_metadata, found); 380 return UDS_SUCCESS; 381 } 382 383 if ((zone->newest_virtual_chapter > 0) && 384 (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) && 385 (zone->writing_chapter->size > 0)) { 386 uds_search_open_chapter(zone->writing_chapter, &request->record_name, 387 &request->old_metadata, found); 388 return UDS_SUCCESS; 389 } 390 391 volume = zone->index->volume; 392 if (is_zone_chapter_sparse(zone, request->virtual_chapter) && 393 uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter, 394 request->zone_number)) 395 return search_sparse_cache_in_zone(zone, request, 396 request->virtual_chapter, found); 397 398 return uds_search_volume_page_cache(volume, request, found); 399 } 400 401 static int put_record_in_zone(struct index_zone *zone, struct uds_request *request, 402 const struct uds_record_data *metadata) 403 { 404 unsigned int remaining; 405 406 remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name, 407 metadata); 408 if (remaining == 0) 409 return open_next_chapter(zone); 410 411 return UDS_SUCCESS; 412 } 413 414 static int search_index_zone(struct index_zone *zone, struct uds_request *request) 415 { 416 int result; 417 struct volume_index_record record; 418 bool overflow_record, found = false; 419 struct uds_record_data *metadata; 420 u64 chapter; 421 422 result = uds_get_volume_index_record(zone->index->volume_index, 423 &request->record_name, &record); 424 if (result != UDS_SUCCESS) 425 return result; 426 427 if (record.is_found) { 428 if (request->requeued && request->virtual_chapter != record.virtual_chapter) 429 set_request_location(request, UDS_LOCATION_UNKNOWN); 430 431 request->virtual_chapter = record.virtual_chapter; 432 result = get_record_from_zone(zone, request, &found); 433 if (result != UDS_SUCCESS) 434 return result; 435 } 436 437 if (found) 438 set_chapter_location(request, zone, record.virtual_chapter); 439 440 /* 441 * If a record has overflowed a chapter index in more than one chapter (or overflowed in 442 * one chapter and collided with an existing record), it will exist as a collision record 443 * in the volume index, but we won't find it in the volume. This case needs special 444 * handling. 445 */ 446 overflow_record = (record.is_found && record.is_collision && !found); 447 chapter = zone->newest_virtual_chapter; 448 if (found || overflow_record) { 449 if ((request->type == UDS_QUERY_NO_UPDATE) || 450 ((request->type == UDS_QUERY) && overflow_record)) { 451 /* There is nothing left to do. */ 452 return UDS_SUCCESS; 453 } 454 455 if (record.virtual_chapter != chapter) { 456 /* 457 * Update the volume index to reference the new chapter for the block. If 458 * the record had been deleted or dropped from the chapter index, it will 459 * be back. 460 */ 461 result = uds_set_volume_index_record_chapter(&record, chapter); 462 } else if (request->type != UDS_UPDATE) { 463 /* The record is already in the open chapter. */ 464 return UDS_SUCCESS; 465 } 466 } else { 467 /* 468 * The record wasn't in the volume index, so check whether the 469 * name is in a cached sparse chapter. If we found the name on 470 * a previous search, use that result instead. 471 */ 472 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) { 473 found = true; 474 } else if (request->location == UDS_LOCATION_UNAVAILABLE) { 475 found = false; 476 } else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) && 477 !uds_is_volume_index_sample(zone->index->volume_index, 478 &request->record_name)) { 479 result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER, 480 &found); 481 if (result != UDS_SUCCESS) 482 return result; 483 } 484 485 if (found) 486 set_request_location(request, UDS_LOCATION_IN_SPARSE); 487 488 if ((request->type == UDS_QUERY_NO_UPDATE) || 489 ((request->type == UDS_QUERY) && !found)) { 490 /* There is nothing left to do. */ 491 return UDS_SUCCESS; 492 } 493 494 /* 495 * Add a new entry to the volume index referencing the open chapter. This needs to 496 * be done both for new records, and for records from cached sparse chapters. 497 */ 498 result = uds_put_volume_index_record(&record, chapter); 499 } 500 501 if (result == UDS_OVERFLOW) { 502 /* 503 * The volume index encountered a delta list overflow. The condition was already 504 * logged. We will go on without adding the record to the open chapter. 505 */ 506 return UDS_SUCCESS; 507 } 508 509 if (result != UDS_SUCCESS) 510 return result; 511 512 if (!found || (request->type == UDS_UPDATE)) { 513 /* This is a new record or we're updating an existing record. */ 514 metadata = &request->new_metadata; 515 } else { 516 /* Move the existing record to the open chapter. */ 517 metadata = &request->old_metadata; 518 } 519 520 return put_record_in_zone(zone, request, metadata); 521 } 522 523 static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request) 524 { 525 int result; 526 struct volume_index_record record; 527 528 result = uds_get_volume_index_record(zone->index->volume_index, 529 &request->record_name, &record); 530 if (result != UDS_SUCCESS) 531 return result; 532 533 if (!record.is_found) 534 return UDS_SUCCESS; 535 536 /* If the request was requeued, check whether the saved state is still valid. */ 537 538 if (record.is_collision) { 539 set_chapter_location(request, zone, record.virtual_chapter); 540 } else { 541 /* Non-collision records are hints, so resolve the name in the chapter. */ 542 bool found; 543 544 if (request->requeued && request->virtual_chapter != record.virtual_chapter) 545 set_request_location(request, UDS_LOCATION_UNKNOWN); 546 547 request->virtual_chapter = record.virtual_chapter; 548 result = get_record_from_zone(zone, request, &found); 549 if (result != UDS_SUCCESS) 550 return result; 551 552 if (!found) { 553 /* There is no record to remove. */ 554 return UDS_SUCCESS; 555 } 556 } 557 558 set_chapter_location(request, zone, record.virtual_chapter); 559 560 /* 561 * Delete the volume index entry for the named record only. Note that a later search might 562 * later return stale advice if there is a colliding name in the same chapter, but it's a 563 * very rare case (1 in 2^21). 564 */ 565 result = uds_remove_volume_index_record(&record); 566 if (result != UDS_SUCCESS) 567 return result; 568 569 /* 570 * If the record is in the open chapter, we must remove it or mark it deleted to avoid 571 * trouble if the record is added again later. 572 */ 573 if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER) 574 uds_remove_from_open_chapter(zone->open_chapter, &request->record_name); 575 576 return UDS_SUCCESS; 577 } 578 579 static int dispatch_index_request(struct uds_index *index, struct uds_request *request) 580 { 581 int result; 582 struct index_zone *zone = index->zones[request->zone_number]; 583 584 if (!request->requeued) { 585 result = simulate_index_zone_barrier_message(zone, request); 586 if (result != UDS_SUCCESS) 587 return result; 588 } 589 590 switch (request->type) { 591 case UDS_POST: 592 case UDS_UPDATE: 593 case UDS_QUERY: 594 case UDS_QUERY_NO_UPDATE: 595 result = search_index_zone(zone, request); 596 break; 597 598 case UDS_DELETE: 599 result = remove_from_index_zone(zone, request); 600 break; 601 602 default: 603 result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT, 604 "invalid request type: %d", 605 request->type); 606 break; 607 } 608 609 return result; 610 } 611 612 /* This is the request processing function invoked by each zone's thread. */ 613 static void execute_zone_request(struct uds_request *request) 614 { 615 int result; 616 struct uds_index *index = request->index; 617 618 if (request->zone_message.type != UDS_MESSAGE_NONE) { 619 result = dispatch_index_zone_control_request(request); 620 if (result != UDS_SUCCESS) { 621 vdo_log_error_strerror(result, "error executing message: %d", 622 request->zone_message.type); 623 } 624 625 /* Once the message is processed it can be freed. */ 626 vdo_free(vdo_forget(request)); 627 return; 628 } 629 630 index->need_to_save = true; 631 if (request->requeued && (request->status != UDS_SUCCESS)) { 632 set_request_location(request, UDS_LOCATION_UNAVAILABLE); 633 index->callback(request); 634 return; 635 } 636 637 result = dispatch_index_request(index, request); 638 if (result == UDS_QUEUED) { 639 /* The request has been requeued so don't let it complete. */ 640 return; 641 } 642 643 if (!request->found) 644 set_request_location(request, UDS_LOCATION_UNAVAILABLE); 645 646 request->status = result; 647 index->callback(request); 648 } 649 650 static int initialize_index_queues(struct uds_index *index, 651 const struct index_geometry *geometry) 652 { 653 int result; 654 unsigned int i; 655 656 for (i = 0; i < index->zone_count; i++) { 657 result = uds_make_request_queue("indexW", &execute_zone_request, 658 &index->zone_queues[i]); 659 if (result != UDS_SUCCESS) 660 return result; 661 } 662 663 /* The triage queue is only needed for sparse multi-zone indexes. */ 664 if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) { 665 result = uds_make_request_queue("triageW", &triage_request, 666 &index->triage_queue); 667 if (result != UDS_SUCCESS) 668 return result; 669 } 670 671 return UDS_SUCCESS; 672 } 673 674 /* This is the driver function for the chapter writer thread. */ 675 static void close_chapters(void *arg) 676 { 677 int result; 678 struct chapter_writer *writer = arg; 679 struct uds_index *index = writer->index; 680 681 vdo_log_debug("chapter writer starting"); 682 mutex_lock(&writer->mutex); 683 for (;;) { 684 while (writer->zones_to_write < index->zone_count) { 685 if (writer->stop && (writer->zones_to_write == 0)) { 686 /* 687 * We've been told to stop, and all of the zones are in the same 688 * open chapter, so we can exit now. 689 */ 690 mutex_unlock(&writer->mutex); 691 vdo_log_debug("chapter writer stopping"); 692 return; 693 } 694 uds_wait_cond(&writer->cond, &writer->mutex); 695 } 696 697 /* 698 * Release the lock while closing a chapter. We probably don't need to do this, but 699 * it seems safer in principle. It's OK to access the chapter and chapter_number 700 * fields without the lock since those aren't allowed to change until we're done. 701 */ 702 mutex_unlock(&writer->mutex); 703 704 if (index->has_saved_open_chapter) { 705 /* 706 * Remove the saved open chapter the first time we close an open chapter 707 * after loading from a clean shutdown, or after doing a clean save. The 708 * lack of the saved open chapter will indicate that a recovery is 709 * necessary. 710 */ 711 index->has_saved_open_chapter = false; 712 result = uds_discard_open_chapter(index->layout); 713 if (result == UDS_SUCCESS) 714 vdo_log_debug("Discarding saved open chapter"); 715 } 716 717 result = uds_close_open_chapter(writer->chapters, index->zone_count, 718 index->volume, 719 writer->open_chapter_index, 720 writer->collated_records, 721 index->newest_virtual_chapter); 722 723 mutex_lock(&writer->mutex); 724 index->newest_virtual_chapter++; 725 index->oldest_virtual_chapter += 726 uds_chapters_to_expire(index->volume->geometry, 727 index->newest_virtual_chapter); 728 writer->result = result; 729 writer->zones_to_write = 0; 730 uds_broadcast_cond(&writer->cond); 731 } 732 } 733 734 static void stop_chapter_writer(struct chapter_writer *writer) 735 { 736 struct thread *writer_thread = NULL; 737 738 mutex_lock(&writer->mutex); 739 if (writer->thread != NULL) { 740 writer_thread = writer->thread; 741 writer->thread = NULL; 742 writer->stop = true; 743 uds_broadcast_cond(&writer->cond); 744 } 745 mutex_unlock(&writer->mutex); 746 747 if (writer_thread != NULL) 748 vdo_join_threads(writer_thread); 749 } 750 751 static void free_chapter_writer(struct chapter_writer *writer) 752 { 753 if (writer == NULL) 754 return; 755 756 stop_chapter_writer(writer); 757 uds_free_open_chapter_index(writer->open_chapter_index); 758 vdo_free(writer->collated_records); 759 vdo_free(writer); 760 } 761 762 static int make_chapter_writer(struct uds_index *index, 763 struct chapter_writer **writer_ptr) 764 { 765 int result; 766 struct chapter_writer *writer; 767 size_t collated_records_size = 768 (sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter); 769 770 result = vdo_allocate_extended(struct chapter_writer, index->zone_count, 771 struct open_chapter_zone *, "Chapter Writer", 772 &writer); 773 if (result != VDO_SUCCESS) 774 return result; 775 776 writer->index = index; 777 mutex_init(&writer->mutex); 778 uds_init_cond(&writer->cond); 779 780 result = vdo_allocate_cache_aligned(collated_records_size, "collated records", 781 &writer->collated_records); 782 if (result != VDO_SUCCESS) { 783 free_chapter_writer(writer); 784 return result; 785 } 786 787 result = uds_make_open_chapter_index(&writer->open_chapter_index, 788 index->volume->geometry, 789 index->volume->nonce); 790 if (result != UDS_SUCCESS) { 791 free_chapter_writer(writer); 792 return result; 793 } 794 795 writer->memory_size = (sizeof(struct chapter_writer) + 796 index->zone_count * sizeof(struct open_chapter_zone *) + 797 collated_records_size + 798 writer->open_chapter_index->memory_size); 799 800 result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread); 801 if (result != VDO_SUCCESS) { 802 free_chapter_writer(writer); 803 return result; 804 } 805 806 *writer_ptr = writer; 807 return UDS_SUCCESS; 808 } 809 810 static int load_index(struct uds_index *index) 811 { 812 int result; 813 u64 last_save_chapter; 814 815 result = uds_load_index_state(index->layout, index); 816 if (result != UDS_SUCCESS) 817 return UDS_INDEX_NOT_SAVED_CLEANLY; 818 819 last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0); 820 821 vdo_log_info("loaded index from chapter %llu through chapter %llu", 822 (unsigned long long) index->oldest_virtual_chapter, 823 (unsigned long long) last_save_chapter); 824 825 return UDS_SUCCESS; 826 } 827 828 static int rebuild_index_page_map(struct uds_index *index, u64 vcn) 829 { 830 int result; 831 struct delta_index_page *chapter_index_page; 832 struct index_geometry *geometry = index->volume->geometry; 833 u32 chapter = uds_map_to_physical_chapter(geometry, vcn); 834 u32 expected_list_number = 0; 835 u32 index_page_number; 836 u32 lowest_delta_list; 837 u32 highest_delta_list; 838 839 for (index_page_number = 0; 840 index_page_number < geometry->index_pages_per_chapter; 841 index_page_number++) { 842 result = uds_get_volume_index_page(index->volume, chapter, 843 index_page_number, 844 &chapter_index_page); 845 if (result != UDS_SUCCESS) { 846 return vdo_log_error_strerror(result, 847 "failed to read index page %u in chapter %u", 848 index_page_number, chapter); 849 } 850 851 lowest_delta_list = chapter_index_page->lowest_list_number; 852 highest_delta_list = chapter_index_page->highest_list_number; 853 if (lowest_delta_list != expected_list_number) { 854 return vdo_log_error_strerror(UDS_CORRUPT_DATA, 855 "chapter %u index page %u is corrupt", 856 chapter, index_page_number); 857 } 858 859 uds_update_index_page_map(index->volume->index_page_map, vcn, chapter, 860 index_page_number, highest_delta_list); 861 expected_list_number = highest_delta_list + 1; 862 } 863 864 return UDS_SUCCESS; 865 } 866 867 static int replay_record(struct uds_index *index, const struct uds_record_name *name, 868 u64 virtual_chapter, bool will_be_sparse_chapter) 869 { 870 int result; 871 struct volume_index_record record; 872 bool update_record; 873 874 if (will_be_sparse_chapter && 875 !uds_is_volume_index_sample(index->volume_index, name)) { 876 /* 877 * This entry will be in a sparse chapter after the rebuild completes, and it is 878 * not a sample, so just skip over it. 879 */ 880 return UDS_SUCCESS; 881 } 882 883 result = uds_get_volume_index_record(index->volume_index, name, &record); 884 if (result != UDS_SUCCESS) 885 return result; 886 887 if (record.is_found) { 888 if (record.is_collision) { 889 if (record.virtual_chapter == virtual_chapter) { 890 /* The record is already correct. */ 891 return UDS_SUCCESS; 892 } 893 894 update_record = true; 895 } else if (record.virtual_chapter == virtual_chapter) { 896 /* 897 * There is a volume index entry pointing to the current chapter, but we 898 * don't know if it is for the same name as the one we are currently 899 * working on or not. For now, we're just going to assume that it isn't. 900 * This will create one extra collision record if there was a deleted 901 * record in the current chapter. 902 */ 903 update_record = false; 904 } else { 905 /* 906 * If we're rebuilding, we don't normally want to go to disk to see if the 907 * record exists, since we will likely have just read the record from disk 908 * (i.e. we know it's there). The exception to this is when we find an 909 * entry in the volume index that has a different chapter. In this case, we 910 * need to search that chapter to determine if the volume index entry was 911 * for the same record or a different one. 912 */ 913 result = uds_search_volume_page_cache_for_rebuild(index->volume, 914 name, 915 record.virtual_chapter, 916 &update_record); 917 if (result != UDS_SUCCESS) 918 return result; 919 } 920 } else { 921 update_record = false; 922 } 923 924 if (update_record) { 925 /* 926 * Update the volume index to reference the new chapter for the block. If the 927 * record had been deleted or dropped from the chapter index, it will be back. 928 */ 929 result = uds_set_volume_index_record_chapter(&record, virtual_chapter); 930 } else { 931 /* 932 * Add a new entry to the volume index referencing the open chapter. This should be 933 * done regardless of whether we are a brand new record or a sparse record, i.e. 934 * one that doesn't exist in the index but does on disk, since for a sparse record, 935 * we would want to un-sparsify if it did exist. 936 */ 937 result = uds_put_volume_index_record(&record, virtual_chapter); 938 } 939 940 if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) { 941 /* The rebuilt index will lose these records. */ 942 return UDS_SUCCESS; 943 } 944 945 return result; 946 } 947 948 static bool check_for_suspend(struct uds_index *index) 949 { 950 bool closing; 951 952 if (index->load_context == NULL) 953 return false; 954 955 mutex_lock(&index->load_context->mutex); 956 if (index->load_context->status != INDEX_SUSPENDING) { 957 mutex_unlock(&index->load_context->mutex); 958 return false; 959 } 960 961 /* Notify that we are suspended and wait for the resume. */ 962 index->load_context->status = INDEX_SUSPENDED; 963 uds_broadcast_cond(&index->load_context->cond); 964 965 while ((index->load_context->status != INDEX_OPENING) && 966 (index->load_context->status != INDEX_FREEING)) 967 uds_wait_cond(&index->load_context->cond, &index->load_context->mutex); 968 969 closing = (index->load_context->status == INDEX_FREEING); 970 mutex_unlock(&index->load_context->mutex); 971 return closing; 972 } 973 974 static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse) 975 { 976 int result; 977 u32 i; 978 u32 j; 979 const struct index_geometry *geometry; 980 u32 physical_chapter; 981 982 if (check_for_suspend(index)) { 983 vdo_log_info("Replay interrupted by index shutdown at chapter %llu", 984 (unsigned long long) virtual); 985 return -EBUSY; 986 } 987 988 geometry = index->volume->geometry; 989 physical_chapter = uds_map_to_physical_chapter(geometry, virtual); 990 uds_prefetch_volume_chapter(index->volume, physical_chapter); 991 uds_set_volume_index_open_chapter(index->volume_index, virtual); 992 993 result = rebuild_index_page_map(index, virtual); 994 if (result != UDS_SUCCESS) { 995 return vdo_log_error_strerror(result, 996 "could not rebuild index page map for chapter %u", 997 physical_chapter); 998 } 999 1000 for (i = 0; i < geometry->record_pages_per_chapter; i++) { 1001 u8 *record_page; 1002 u32 record_page_number; 1003 1004 record_page_number = geometry->index_pages_per_chapter + i; 1005 result = uds_get_volume_record_page(index->volume, physical_chapter, 1006 record_page_number, &record_page); 1007 if (result != UDS_SUCCESS) { 1008 return vdo_log_error_strerror(result, "could not get page %d", 1009 record_page_number); 1010 } 1011 1012 for (j = 0; j < geometry->records_per_page; j++) { 1013 const u8 *name_bytes; 1014 struct uds_record_name name; 1015 1016 name_bytes = record_page + (j * BYTES_PER_RECORD); 1017 memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE); 1018 result = replay_record(index, &name, virtual, sparse); 1019 if (result != UDS_SUCCESS) 1020 return result; 1021 } 1022 } 1023 1024 return UDS_SUCCESS; 1025 } 1026 1027 static int replay_volume(struct uds_index *index) 1028 { 1029 int result; 1030 u64 old_map_update; 1031 u64 new_map_update; 1032 u64 virtual; 1033 u64 from_virtual = index->oldest_virtual_chapter; 1034 u64 upto_virtual = index->newest_virtual_chapter; 1035 bool will_be_sparse; 1036 1037 vdo_log_info("Replaying volume from chapter %llu through chapter %llu", 1038 (unsigned long long) from_virtual, 1039 (unsigned long long) upto_virtual); 1040 1041 /* 1042 * The index failed to load, so the volume index is empty. Add records to the volume index 1043 * in order, skipping non-hooks in chapters which will be sparse to save time. 1044 * 1045 * Go through each record page of each chapter and add the records back to the volume 1046 * index. This should not cause anything to be written to either the open chapter or the 1047 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this 1048 * would have already been purged from the volume index when the chapter was opened. 1049 * 1050 * Also, go through each index page for each chapter and rebuild the index page map. 1051 */ 1052 old_map_update = index->volume->index_page_map->last_update; 1053 for (virtual = from_virtual; virtual < upto_virtual; virtual++) { 1054 will_be_sparse = uds_is_chapter_sparse(index->volume->geometry, 1055 from_virtual, upto_virtual, 1056 virtual); 1057 result = replay_chapter(index, virtual, will_be_sparse); 1058 if (result != UDS_SUCCESS) 1059 return result; 1060 } 1061 1062 /* Also reap the chapter being replaced by the open chapter. */ 1063 uds_set_volume_index_open_chapter(index->volume_index, upto_virtual); 1064 1065 new_map_update = index->volume->index_page_map->last_update; 1066 if (new_map_update != old_map_update) { 1067 vdo_log_info("replay changed index page map update from %llu to %llu", 1068 (unsigned long long) old_map_update, 1069 (unsigned long long) new_map_update); 1070 } 1071 1072 return UDS_SUCCESS; 1073 } 1074 1075 static int rebuild_index(struct uds_index *index) 1076 { 1077 int result; 1078 u64 lowest; 1079 u64 highest; 1080 bool is_empty = false; 1081 u32 chapters_per_volume = index->volume->geometry->chapters_per_volume; 1082 1083 index->volume->lookup_mode = LOOKUP_FOR_REBUILD; 1084 result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest, 1085 &is_empty); 1086 if (result != UDS_SUCCESS) { 1087 return vdo_log_fatal_strerror(result, 1088 "cannot rebuild index: unknown volume chapter boundaries"); 1089 } 1090 1091 if (is_empty) { 1092 index->newest_virtual_chapter = 0; 1093 index->oldest_virtual_chapter = 0; 1094 index->volume->lookup_mode = LOOKUP_NORMAL; 1095 return UDS_SUCCESS; 1096 } 1097 1098 index->newest_virtual_chapter = highest + 1; 1099 index->oldest_virtual_chapter = lowest; 1100 if (index->newest_virtual_chapter == 1101 (index->oldest_virtual_chapter + chapters_per_volume)) { 1102 /* Skip the chapter shadowed by the open chapter. */ 1103 index->oldest_virtual_chapter++; 1104 } 1105 1106 result = replay_volume(index); 1107 if (result != UDS_SUCCESS) 1108 return result; 1109 1110 index->volume->lookup_mode = LOOKUP_NORMAL; 1111 return UDS_SUCCESS; 1112 } 1113 1114 static void free_index_zone(struct index_zone *zone) 1115 { 1116 if (zone == NULL) 1117 return; 1118 1119 uds_free_open_chapter(zone->open_chapter); 1120 uds_free_open_chapter(zone->writing_chapter); 1121 vdo_free(zone); 1122 } 1123 1124 static int make_index_zone(struct uds_index *index, unsigned int zone_number) 1125 { 1126 int result; 1127 struct index_zone *zone; 1128 1129 result = vdo_allocate(1, struct index_zone, "index zone", &zone); 1130 if (result != VDO_SUCCESS) 1131 return result; 1132 1133 result = uds_make_open_chapter(index->volume->geometry, index->zone_count, 1134 &zone->open_chapter); 1135 if (result != UDS_SUCCESS) { 1136 free_index_zone(zone); 1137 return result; 1138 } 1139 1140 result = uds_make_open_chapter(index->volume->geometry, index->zone_count, 1141 &zone->writing_chapter); 1142 if (result != UDS_SUCCESS) { 1143 free_index_zone(zone); 1144 return result; 1145 } 1146 1147 zone->index = index; 1148 zone->id = zone_number; 1149 index->zones[zone_number] = zone; 1150 1151 return UDS_SUCCESS; 1152 } 1153 1154 int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type, 1155 struct index_load_context *load_context, index_callback_fn callback, 1156 struct uds_index **new_index) 1157 { 1158 int result; 1159 bool loaded = false; 1160 bool new = (open_type == UDS_CREATE); 1161 struct uds_index *index = NULL; 1162 struct index_zone *zone; 1163 u64 nonce; 1164 unsigned int z; 1165 1166 result = vdo_allocate_extended(struct uds_index, config->zone_count, 1167 struct uds_request_queue *, "index", &index); 1168 if (result != VDO_SUCCESS) 1169 return result; 1170 1171 index->zone_count = config->zone_count; 1172 1173 result = uds_make_index_layout(config, new, &index->layout); 1174 if (result != UDS_SUCCESS) { 1175 uds_free_index(index); 1176 return result; 1177 } 1178 1179 result = vdo_allocate(index->zone_count, struct index_zone *, "zones", 1180 &index->zones); 1181 if (result != VDO_SUCCESS) { 1182 uds_free_index(index); 1183 return result; 1184 } 1185 1186 result = uds_make_volume(config, index->layout, &index->volume); 1187 if (result != UDS_SUCCESS) { 1188 uds_free_index(index); 1189 return result; 1190 } 1191 1192 index->volume->lookup_mode = LOOKUP_NORMAL; 1193 for (z = 0; z < index->zone_count; z++) { 1194 result = make_index_zone(index, z); 1195 if (result != UDS_SUCCESS) { 1196 uds_free_index(index); 1197 return vdo_log_error_strerror(result, 1198 "Could not create index zone"); 1199 } 1200 } 1201 1202 nonce = uds_get_volume_nonce(index->layout); 1203 result = uds_make_volume_index(config, nonce, &index->volume_index); 1204 if (result != UDS_SUCCESS) { 1205 uds_free_index(index); 1206 return vdo_log_error_strerror(result, "could not make volume index"); 1207 } 1208 1209 index->load_context = load_context; 1210 index->callback = callback; 1211 1212 result = initialize_index_queues(index, config->geometry); 1213 if (result != UDS_SUCCESS) { 1214 uds_free_index(index); 1215 return result; 1216 } 1217 1218 result = make_chapter_writer(index, &index->chapter_writer); 1219 if (result != UDS_SUCCESS) { 1220 uds_free_index(index); 1221 return result; 1222 } 1223 1224 if (!new) { 1225 result = load_index(index); 1226 switch (result) { 1227 case UDS_SUCCESS: 1228 loaded = true; 1229 break; 1230 case -ENOMEM: 1231 /* We should not try a rebuild for this error. */ 1232 vdo_log_error_strerror(result, "index could not be loaded"); 1233 break; 1234 default: 1235 vdo_log_error_strerror(result, "index could not be loaded"); 1236 if (open_type == UDS_LOAD) { 1237 result = rebuild_index(index); 1238 if (result != UDS_SUCCESS) { 1239 vdo_log_error_strerror(result, 1240 "index could not be rebuilt"); 1241 } 1242 } 1243 break; 1244 } 1245 } 1246 1247 if (result != UDS_SUCCESS) { 1248 uds_free_index(index); 1249 return vdo_log_error_strerror(result, "fatal error in %s()", __func__); 1250 } 1251 1252 for (z = 0; z < index->zone_count; z++) { 1253 zone = index->zones[z]; 1254 zone->oldest_virtual_chapter = index->oldest_virtual_chapter; 1255 zone->newest_virtual_chapter = index->newest_virtual_chapter; 1256 } 1257 1258 if (index->load_context != NULL) { 1259 mutex_lock(&index->load_context->mutex); 1260 index->load_context->status = INDEX_READY; 1261 /* 1262 * If we get here, suspend is meaningless, but notify any thread trying to suspend 1263 * us so it doesn't hang. 1264 */ 1265 uds_broadcast_cond(&index->load_context->cond); 1266 mutex_unlock(&index->load_context->mutex); 1267 } 1268 1269 index->has_saved_open_chapter = loaded; 1270 index->need_to_save = !loaded; 1271 *new_index = index; 1272 return UDS_SUCCESS; 1273 } 1274 1275 void uds_free_index(struct uds_index *index) 1276 { 1277 unsigned int i; 1278 1279 if (index == NULL) 1280 return; 1281 1282 uds_request_queue_finish(index->triage_queue); 1283 for (i = 0; i < index->zone_count; i++) 1284 uds_request_queue_finish(index->zone_queues[i]); 1285 1286 free_chapter_writer(index->chapter_writer); 1287 1288 uds_free_volume_index(index->volume_index); 1289 if (index->zones != NULL) { 1290 for (i = 0; i < index->zone_count; i++) 1291 free_index_zone(index->zones[i]); 1292 vdo_free(index->zones); 1293 } 1294 1295 uds_free_volume(index->volume); 1296 uds_free_index_layout(vdo_forget(index->layout)); 1297 vdo_free(index); 1298 } 1299 1300 /* Wait for the chapter writer to complete any outstanding writes. */ 1301 void uds_wait_for_idle_index(struct uds_index *index) 1302 { 1303 struct chapter_writer *writer = index->chapter_writer; 1304 1305 mutex_lock(&writer->mutex); 1306 while (writer->zones_to_write > 0) 1307 uds_wait_cond(&writer->cond, &writer->mutex); 1308 mutex_unlock(&writer->mutex); 1309 } 1310 1311 /* This function assumes that all requests have been drained. */ 1312 int uds_save_index(struct uds_index *index) 1313 { 1314 int result; 1315 1316 if (!index->need_to_save) 1317 return UDS_SUCCESS; 1318 1319 uds_wait_for_idle_index(index); 1320 index->prev_save = index->last_save; 1321 index->last_save = ((index->newest_virtual_chapter == 0) ? 1322 NO_LAST_SAVE : index->newest_virtual_chapter - 1); 1323 vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save); 1324 1325 result = uds_save_index_state(index->layout, index); 1326 if (result != UDS_SUCCESS) { 1327 vdo_log_info("save index failed"); 1328 index->last_save = index->prev_save; 1329 } else { 1330 index->has_saved_open_chapter = true; 1331 index->need_to_save = false; 1332 vdo_log_info("finished save (vcn %llu)", 1333 (unsigned long long) index->last_save); 1334 } 1335 1336 return result; 1337 } 1338 1339 int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev) 1340 { 1341 return uds_replace_volume_storage(index->volume, index->layout, bdev); 1342 } 1343 1344 /* Accessing statistics should be safe from any thread. */ 1345 void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters) 1346 { 1347 struct volume_index_stats stats; 1348 1349 uds_get_volume_index_stats(index->volume_index, &stats); 1350 counters->entries_indexed = stats.record_count; 1351 counters->collisions = stats.collision_count; 1352 counters->entries_discarded = stats.discard_count; 1353 1354 counters->memory_used = (index->volume_index->memory_size + 1355 index->volume->cache_size + 1356 index->chapter_writer->memory_size); 1357 } 1358 1359 void uds_enqueue_request(struct uds_request *request, enum request_stage stage) 1360 { 1361 struct uds_index *index = request->index; 1362 struct uds_request_queue *queue; 1363 1364 switch (stage) { 1365 case STAGE_TRIAGE: 1366 if (index->triage_queue != NULL) { 1367 queue = index->triage_queue; 1368 break; 1369 } 1370 1371 fallthrough; 1372 1373 case STAGE_INDEX: 1374 request->zone_number = 1375 uds_get_volume_index_zone(index->volume_index, &request->record_name); 1376 fallthrough; 1377 1378 case STAGE_MESSAGE: 1379 queue = index->zone_queues[request->zone_number]; 1380 break; 1381 1382 default: 1383 VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage); 1384 return; 1385 } 1386 1387 uds_request_queue_enqueue(queue, request); 1388 } 1389