1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 7 #include "index.h" 8 9 #include "logger.h" 10 #include "memory-alloc.h" 11 12 #include "funnel-requestqueue.h" 13 #include "hash-utils.h" 14 #include "sparse-cache.h" 15 16 static const u64 NO_LAST_SAVE = U64_MAX; 17 18 /* 19 * When searching for deduplication records, the index first searches the volume index, and then 20 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to 21 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been 22 * committed (either the open chapter or a recently closed one), the index searches the in-memory 23 * representation of the chapter. Finally, if the volume index does not find a record and the index 24 * is sparse, the index will search the sparse cache. 25 * 26 * The index send two kinds of messages to coordinate between zones: chapter close messages for the 27 * chapter writer, and sparse cache barrier messages for the sparse cache. 28 * 29 * The chapter writer is responsible for committing chapters of records to storage. Since zones can 30 * get different numbers of records, some zones may fall behind others. Each time a zone fills up 31 * its available space in a chapter, it informs the chapter writer that the chapter is complete, 32 * and also informs all other zones that it has closed the chapter. Each other zone will then close 33 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones. 34 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage. 35 * 36 * The last zone to close the chapter also removes the oldest chapter from the volume index. 37 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter 38 * means that those zones will never ask the volume index about it. No zone is allowed to get more 39 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another 40 * chapter before the previous one has been closed by all zones, it is forced to wait. 41 * 42 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a 43 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone 44 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and 45 * paused its operations, the cache membership is changed and each zone is then informed that it 46 * can proceed. More details can be found in the sparse cache documentation. 47 * 48 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the 49 * barrier message to change the sparse cache membership, so the index simulates the message by 50 * invoking the handler directly. 51 */ 52 53 struct chapter_writer { 54 /* The index to which we belong */ 55 struct uds_index *index; 56 /* The thread to do the writing */ 57 struct thread *thread; 58 /* The lock protecting the following fields */ 59 struct mutex mutex; 60 /* The condition signalled on state changes */ 61 struct cond_var cond; 62 /* Set to true to stop the thread */ 63 bool stop; 64 /* The result from the most recent write */ 65 int result; 66 /* The number of bytes allocated by the chapter writer */ 67 size_t memory_size; 68 /* The number of zones which have submitted a chapter for writing */ 69 unsigned int zones_to_write; 70 /* Open chapter index used by uds_close_open_chapter() */ 71 struct open_chapter_index *open_chapter_index; 72 /* Collated records used by uds_close_open_chapter() */ 73 struct uds_volume_record *collated_records; 74 /* The chapters to write (one per zone) */ 75 struct open_chapter_zone *chapters[]; 76 }; 77 78 static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter) 79 { 80 return uds_is_chapter_sparse(zone->index->volume->geometry, 81 zone->oldest_virtual_chapter, 82 zone->newest_virtual_chapter, virtual_chapter); 83 } 84 85 static int launch_zone_message(struct uds_zone_message message, unsigned int zone, 86 struct uds_index *index) 87 { 88 int result; 89 struct uds_request *request; 90 91 result = vdo_allocate(1, __func__, &request); 92 if (result != VDO_SUCCESS) 93 return result; 94 95 request->index = index; 96 request->unbatched = true; 97 request->zone_number = zone; 98 request->zone_message = message; 99 100 uds_enqueue_request(request, STAGE_MESSAGE); 101 return UDS_SUCCESS; 102 } 103 104 static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter) 105 { 106 struct uds_zone_message message = { 107 .type = UDS_MESSAGE_SPARSE_CACHE_BARRIER, 108 .virtual_chapter = virtual_chapter, 109 }; 110 unsigned int zone; 111 112 for (zone = 0; zone < index->zone_count; zone++) { 113 int result = launch_zone_message(message, zone, index); 114 115 VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation"); 116 } 117 } 118 119 /* 120 * Determine whether this request should trigger a sparse cache barrier message to change the 121 * membership of the sparse cache. If a change in membership is desired, the function returns the 122 * chapter number to add. 123 */ 124 static u64 triage_index_request(struct uds_index *index, struct uds_request *request) 125 { 126 u64 virtual_chapter; 127 struct index_zone *zone; 128 129 virtual_chapter = uds_lookup_volume_index_name(index->volume_index, 130 &request->record_name); 131 if (virtual_chapter == NO_CHAPTER) 132 return NO_CHAPTER; 133 134 zone = index->zones[request->zone_number]; 135 if (!is_zone_chapter_sparse(zone, virtual_chapter)) 136 return NO_CHAPTER; 137 138 /* 139 * FIXME: Optimize for a common case by remembering the chapter from the most recent 140 * barrier message and skipping this chapter if is it the same. 141 */ 142 143 return virtual_chapter; 144 } 145 146 /* 147 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This 148 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind 149 * of index does nothing here. 150 */ 151 static int simulate_index_zone_barrier_message(struct index_zone *zone, 152 struct uds_request *request) 153 { 154 u64 sparse_virtual_chapter; 155 156 if ((zone->index->zone_count > 1) || 157 !uds_is_sparse_index_geometry(zone->index->volume->geometry)) 158 return UDS_SUCCESS; 159 160 sparse_virtual_chapter = triage_index_request(zone->index, request); 161 if (sparse_virtual_chapter == NO_CHAPTER) 162 return UDS_SUCCESS; 163 164 return uds_update_sparse_cache(zone, sparse_virtual_chapter); 165 } 166 167 /* This is the request processing function for the triage queue. */ 168 static void triage_request(struct uds_request *request) 169 { 170 struct uds_index *index = request->index; 171 u64 sparse_virtual_chapter = triage_index_request(index, request); 172 173 if (sparse_virtual_chapter != NO_CHAPTER) 174 enqueue_barrier_messages(index, sparse_virtual_chapter); 175 176 uds_enqueue_request(request, STAGE_INDEX); 177 } 178 179 static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number) 180 { 181 int result; 182 struct chapter_writer *writer = index->chapter_writer; 183 184 mutex_lock(&writer->mutex); 185 while (index->newest_virtual_chapter < current_chapter_number) 186 uds_wait_cond(&writer->cond, &writer->mutex); 187 result = writer->result; 188 mutex_unlock(&writer->mutex); 189 190 if (result != UDS_SUCCESS) 191 return vdo_log_error_strerror(result, 192 "Writing of previous open chapter failed"); 193 194 return UDS_SUCCESS; 195 } 196 197 static int swap_open_chapter(struct index_zone *zone) 198 { 199 int result; 200 201 result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter); 202 if (result != UDS_SUCCESS) 203 return result; 204 205 swap(zone->open_chapter, zone->writing_chapter); 206 return UDS_SUCCESS; 207 } 208 209 /* 210 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start 211 * writing until all zones have closed it. 212 */ 213 static unsigned int start_closing_chapter(struct uds_index *index, 214 unsigned int zone_number, 215 struct open_chapter_zone *chapter) 216 { 217 unsigned int finished_zones; 218 struct chapter_writer *writer = index->chapter_writer; 219 220 mutex_lock(&writer->mutex); 221 finished_zones = ++writer->zones_to_write; 222 writer->chapters[zone_number] = chapter; 223 uds_broadcast_cond(&writer->cond); 224 mutex_unlock(&writer->mutex); 225 226 return finished_zones; 227 } 228 229 static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter) 230 { 231 int result; 232 unsigned int i; 233 struct uds_zone_message zone_message = { 234 .type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED, 235 .virtual_chapter = closed_chapter, 236 }; 237 238 for (i = 0; i < zone->index->zone_count; i++) { 239 if (zone->id == i) 240 continue; 241 242 result = launch_zone_message(zone_message, i, zone->index); 243 if (result != UDS_SUCCESS) 244 return result; 245 } 246 247 return UDS_SUCCESS; 248 } 249 250 static int open_next_chapter(struct index_zone *zone) 251 { 252 int result; 253 u64 closed_chapter; 254 u64 expiring; 255 unsigned int finished_zones; 256 u32 expire_chapters; 257 258 vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)", 259 (unsigned long long) zone->newest_virtual_chapter, zone->id, 260 zone->open_chapter->size, 261 zone->open_chapter->capacity - zone->open_chapter->size); 262 263 result = swap_open_chapter(zone); 264 if (result != UDS_SUCCESS) 265 return result; 266 267 closed_chapter = zone->newest_virtual_chapter++; 268 uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id, 269 zone->newest_virtual_chapter); 270 uds_reset_open_chapter(zone->open_chapter); 271 272 finished_zones = start_closing_chapter(zone->index, zone->id, 273 zone->writing_chapter); 274 if ((finished_zones == 1) && (zone->index->zone_count > 1)) { 275 result = announce_chapter_closed(zone, closed_chapter); 276 if (result != UDS_SUCCESS) 277 return result; 278 } 279 280 expiring = zone->oldest_virtual_chapter; 281 expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry, 282 zone->newest_virtual_chapter); 283 zone->oldest_virtual_chapter += expire_chapters; 284 285 if (finished_zones < zone->index->zone_count) 286 return UDS_SUCCESS; 287 288 while (expire_chapters-- > 0) 289 uds_forget_chapter(zone->index->volume, expiring++); 290 291 return UDS_SUCCESS; 292 } 293 294 static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter) 295 { 296 if (zone->newest_virtual_chapter == virtual_chapter) 297 return open_next_chapter(zone); 298 299 return UDS_SUCCESS; 300 } 301 302 static int dispatch_index_zone_control_request(struct uds_request *request) 303 { 304 struct uds_zone_message *message = &request->zone_message; 305 struct index_zone *zone = request->index->zones[request->zone_number]; 306 307 switch (message->type) { 308 case UDS_MESSAGE_SPARSE_CACHE_BARRIER: 309 return uds_update_sparse_cache(zone, message->virtual_chapter); 310 311 case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED: 312 return handle_chapter_closed(zone, message->virtual_chapter); 313 314 default: 315 vdo_log_error("invalid message type: %d", message->type); 316 return UDS_INVALID_ARGUMENT; 317 } 318 } 319 320 static void set_request_location(struct uds_request *request, 321 enum uds_index_region new_location) 322 { 323 request->location = new_location; 324 request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) || 325 (new_location == UDS_LOCATION_IN_DENSE) || 326 (new_location == UDS_LOCATION_IN_SPARSE)); 327 } 328 329 static void set_chapter_location(struct uds_request *request, 330 const struct index_zone *zone, u64 virtual_chapter) 331 { 332 request->found = true; 333 if (virtual_chapter == zone->newest_virtual_chapter) 334 request->location = UDS_LOCATION_IN_OPEN_CHAPTER; 335 else if (is_zone_chapter_sparse(zone, virtual_chapter)) 336 request->location = UDS_LOCATION_IN_SPARSE; 337 else 338 request->location = UDS_LOCATION_IN_DENSE; 339 } 340 341 static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request, 342 u64 virtual_chapter, bool *found) 343 { 344 int result; 345 struct volume *volume; 346 u16 record_page_number; 347 u32 chapter; 348 349 result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter, 350 &record_page_number); 351 if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER)) 352 return result; 353 354 request->virtual_chapter = virtual_chapter; 355 volume = zone->index->volume; 356 chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter); 357 return uds_search_cached_record_page(volume, request, chapter, 358 record_page_number, found); 359 } 360 361 static int get_record_from_zone(struct index_zone *zone, struct uds_request *request, 362 bool *found) 363 { 364 struct volume *volume; 365 366 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) { 367 *found = true; 368 return UDS_SUCCESS; 369 } else if (request->location == UDS_LOCATION_UNAVAILABLE) { 370 *found = false; 371 return UDS_SUCCESS; 372 } 373 374 if (request->virtual_chapter == zone->newest_virtual_chapter) { 375 uds_search_open_chapter(zone->open_chapter, &request->record_name, 376 &request->old_metadata, found); 377 return UDS_SUCCESS; 378 } 379 380 if ((zone->newest_virtual_chapter > 0) && 381 (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) && 382 (zone->writing_chapter->size > 0)) { 383 uds_search_open_chapter(zone->writing_chapter, &request->record_name, 384 &request->old_metadata, found); 385 return UDS_SUCCESS; 386 } 387 388 volume = zone->index->volume; 389 if (is_zone_chapter_sparse(zone, request->virtual_chapter) && 390 uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter, 391 request->zone_number)) 392 return search_sparse_cache_in_zone(zone, request, 393 request->virtual_chapter, found); 394 395 return uds_search_volume_page_cache(volume, request, found); 396 } 397 398 static int put_record_in_zone(struct index_zone *zone, struct uds_request *request, 399 const struct uds_record_data *metadata) 400 { 401 unsigned int remaining; 402 403 remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name, 404 metadata); 405 if (remaining == 0) 406 return open_next_chapter(zone); 407 408 return UDS_SUCCESS; 409 } 410 411 static int search_index_zone(struct index_zone *zone, struct uds_request *request) 412 { 413 int result; 414 struct volume_index_record record; 415 bool overflow_record, found = false; 416 struct uds_record_data *metadata; 417 u64 chapter; 418 419 result = uds_get_volume_index_record(zone->index->volume_index, 420 &request->record_name, &record); 421 if (result != UDS_SUCCESS) 422 return result; 423 424 if (record.is_found) { 425 if (request->requeued && request->virtual_chapter != record.virtual_chapter) 426 set_request_location(request, UDS_LOCATION_UNKNOWN); 427 428 request->virtual_chapter = record.virtual_chapter; 429 result = get_record_from_zone(zone, request, &found); 430 if (result != UDS_SUCCESS) 431 return result; 432 } 433 434 if (found) 435 set_chapter_location(request, zone, record.virtual_chapter); 436 437 /* 438 * If a record has overflowed a chapter index in more than one chapter (or overflowed in 439 * one chapter and collided with an existing record), it will exist as a collision record 440 * in the volume index, but we won't find it in the volume. This case needs special 441 * handling. 442 */ 443 overflow_record = (record.is_found && record.is_collision && !found); 444 chapter = zone->newest_virtual_chapter; 445 if (found || overflow_record) { 446 if ((request->type == UDS_QUERY_NO_UPDATE) || 447 ((request->type == UDS_QUERY) && overflow_record)) { 448 /* There is nothing left to do. */ 449 return UDS_SUCCESS; 450 } 451 452 if (record.virtual_chapter != chapter) { 453 /* 454 * Update the volume index to reference the new chapter for the block. If 455 * the record had been deleted or dropped from the chapter index, it will 456 * be back. 457 */ 458 result = uds_set_volume_index_record_chapter(&record, chapter); 459 } else if (request->type != UDS_UPDATE) { 460 /* The record is already in the open chapter. */ 461 return UDS_SUCCESS; 462 } 463 } else { 464 /* 465 * The record wasn't in the volume index, so check whether the 466 * name is in a cached sparse chapter. If we found the name on 467 * a previous search, use that result instead. 468 */ 469 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) { 470 found = true; 471 } else if (request->location == UDS_LOCATION_UNAVAILABLE) { 472 found = false; 473 } else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) && 474 !uds_is_volume_index_sample(zone->index->volume_index, 475 &request->record_name)) { 476 result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER, 477 &found); 478 if (result != UDS_SUCCESS) 479 return result; 480 } 481 482 if (found) 483 set_request_location(request, UDS_LOCATION_IN_SPARSE); 484 485 if ((request->type == UDS_QUERY_NO_UPDATE) || 486 ((request->type == UDS_QUERY) && !found)) { 487 /* There is nothing left to do. */ 488 return UDS_SUCCESS; 489 } 490 491 /* 492 * Add a new entry to the volume index referencing the open chapter. This needs to 493 * be done both for new records, and for records from cached sparse chapters. 494 */ 495 result = uds_put_volume_index_record(&record, chapter); 496 } 497 498 if (result == UDS_OVERFLOW) { 499 /* 500 * The volume index encountered a delta list overflow. The condition was already 501 * logged. We will go on without adding the record to the open chapter. 502 */ 503 return UDS_SUCCESS; 504 } 505 506 if (result != UDS_SUCCESS) 507 return result; 508 509 if (!found || (request->type == UDS_UPDATE)) { 510 /* This is a new record or we're updating an existing record. */ 511 metadata = &request->new_metadata; 512 } else { 513 /* Move the existing record to the open chapter. */ 514 metadata = &request->old_metadata; 515 } 516 517 return put_record_in_zone(zone, request, metadata); 518 } 519 520 static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request) 521 { 522 int result; 523 struct volume_index_record record; 524 525 result = uds_get_volume_index_record(zone->index->volume_index, 526 &request->record_name, &record); 527 if (result != UDS_SUCCESS) 528 return result; 529 530 if (!record.is_found) 531 return UDS_SUCCESS; 532 533 /* If the request was requeued, check whether the saved state is still valid. */ 534 535 if (record.is_collision) { 536 set_chapter_location(request, zone, record.virtual_chapter); 537 } else { 538 /* Non-collision records are hints, so resolve the name in the chapter. */ 539 bool found; 540 541 if (request->requeued && request->virtual_chapter != record.virtual_chapter) 542 set_request_location(request, UDS_LOCATION_UNKNOWN); 543 544 request->virtual_chapter = record.virtual_chapter; 545 result = get_record_from_zone(zone, request, &found); 546 if (result != UDS_SUCCESS) 547 return result; 548 549 if (!found) { 550 /* There is no record to remove. */ 551 return UDS_SUCCESS; 552 } 553 } 554 555 set_chapter_location(request, zone, record.virtual_chapter); 556 557 /* 558 * Delete the volume index entry for the named record only. Note that a later search might 559 * later return stale advice if there is a colliding name in the same chapter, but it's a 560 * very rare case (1 in 2^21). 561 */ 562 result = uds_remove_volume_index_record(&record); 563 if (result != UDS_SUCCESS) 564 return result; 565 566 /* 567 * If the record is in the open chapter, we must remove it or mark it deleted to avoid 568 * trouble if the record is added again later. 569 */ 570 if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER) 571 uds_remove_from_open_chapter(zone->open_chapter, &request->record_name); 572 573 return UDS_SUCCESS; 574 } 575 576 static int dispatch_index_request(struct uds_index *index, struct uds_request *request) 577 { 578 int result; 579 struct index_zone *zone = index->zones[request->zone_number]; 580 581 if (!request->requeued) { 582 result = simulate_index_zone_barrier_message(zone, request); 583 if (result != UDS_SUCCESS) 584 return result; 585 } 586 587 switch (request->type) { 588 case UDS_POST: 589 case UDS_UPDATE: 590 case UDS_QUERY: 591 case UDS_QUERY_NO_UPDATE: 592 result = search_index_zone(zone, request); 593 break; 594 595 case UDS_DELETE: 596 result = remove_from_index_zone(zone, request); 597 break; 598 599 default: 600 result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT, 601 "invalid request type: %d", 602 request->type); 603 break; 604 } 605 606 return result; 607 } 608 609 /* This is the request processing function invoked by each zone's thread. */ 610 static void execute_zone_request(struct uds_request *request) 611 { 612 int result; 613 struct uds_index *index = request->index; 614 615 if (request->zone_message.type != UDS_MESSAGE_NONE) { 616 result = dispatch_index_zone_control_request(request); 617 if (result != UDS_SUCCESS) { 618 vdo_log_error_strerror(result, "error executing message: %d", 619 request->zone_message.type); 620 } 621 622 /* Once the message is processed it can be freed. */ 623 vdo_free(vdo_forget(request)); 624 return; 625 } 626 627 index->need_to_save = true; 628 if (request->requeued && (request->status != UDS_SUCCESS)) { 629 set_request_location(request, UDS_LOCATION_UNAVAILABLE); 630 index->callback(request); 631 return; 632 } 633 634 result = dispatch_index_request(index, request); 635 if (result == UDS_QUEUED) { 636 /* The request has been requeued so don't let it complete. */ 637 return; 638 } 639 640 if (!request->found) 641 set_request_location(request, UDS_LOCATION_UNAVAILABLE); 642 643 request->status = result; 644 index->callback(request); 645 } 646 647 static int initialize_index_queues(struct uds_index *index, 648 const struct index_geometry *geometry) 649 { 650 int result; 651 unsigned int i; 652 653 for (i = 0; i < index->zone_count; i++) { 654 result = uds_make_request_queue("indexW", &execute_zone_request, 655 &index->zone_queues[i]); 656 if (result != UDS_SUCCESS) 657 return result; 658 } 659 660 /* The triage queue is only needed for sparse multi-zone indexes. */ 661 if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) { 662 result = uds_make_request_queue("triageW", &triage_request, 663 &index->triage_queue); 664 if (result != UDS_SUCCESS) 665 return result; 666 } 667 668 return UDS_SUCCESS; 669 } 670 671 /* This is the driver function for the chapter writer thread. */ 672 static void close_chapters(void *arg) 673 { 674 int result; 675 struct chapter_writer *writer = arg; 676 struct uds_index *index = writer->index; 677 678 vdo_log_debug("chapter writer starting"); 679 mutex_lock(&writer->mutex); 680 for (;;) { 681 while (writer->zones_to_write < index->zone_count) { 682 if (writer->stop && (writer->zones_to_write == 0)) { 683 /* 684 * We've been told to stop, and all of the zones are in the same 685 * open chapter, so we can exit now. 686 */ 687 mutex_unlock(&writer->mutex); 688 vdo_log_debug("chapter writer stopping"); 689 return; 690 } 691 uds_wait_cond(&writer->cond, &writer->mutex); 692 } 693 694 /* 695 * Release the lock while closing a chapter. We probably don't need to do this, but 696 * it seems safer in principle. It's OK to access the chapter and chapter_number 697 * fields without the lock since those aren't allowed to change until we're done. 698 */ 699 mutex_unlock(&writer->mutex); 700 701 if (index->has_saved_open_chapter) { 702 /* 703 * Remove the saved open chapter the first time we close an open chapter 704 * after loading from a clean shutdown, or after doing a clean save. The 705 * lack of the saved open chapter will indicate that a recovery is 706 * necessary. 707 */ 708 index->has_saved_open_chapter = false; 709 result = uds_discard_open_chapter(index->layout); 710 if (result == UDS_SUCCESS) 711 vdo_log_debug("Discarding saved open chapter"); 712 } 713 714 result = uds_close_open_chapter(writer->chapters, index->zone_count, 715 index->volume, 716 writer->open_chapter_index, 717 writer->collated_records, 718 index->newest_virtual_chapter); 719 720 mutex_lock(&writer->mutex); 721 index->newest_virtual_chapter++; 722 index->oldest_virtual_chapter += 723 uds_chapters_to_expire(index->volume->geometry, 724 index->newest_virtual_chapter); 725 writer->result = result; 726 writer->zones_to_write = 0; 727 uds_broadcast_cond(&writer->cond); 728 } 729 } 730 731 static void stop_chapter_writer(struct chapter_writer *writer) 732 { 733 struct thread *writer_thread = NULL; 734 735 mutex_lock(&writer->mutex); 736 if (writer->thread != NULL) { 737 writer_thread = writer->thread; 738 writer->thread = NULL; 739 writer->stop = true; 740 uds_broadcast_cond(&writer->cond); 741 } 742 mutex_unlock(&writer->mutex); 743 744 if (writer_thread != NULL) 745 vdo_join_threads(writer_thread); 746 } 747 748 static void free_chapter_writer(struct chapter_writer *writer) 749 { 750 if (writer == NULL) 751 return; 752 753 stop_chapter_writer(writer); 754 uds_free_open_chapter_index(writer->open_chapter_index); 755 vdo_free(writer->collated_records); 756 vdo_free(writer); 757 } 758 759 static int make_chapter_writer(struct uds_index *index, 760 struct chapter_writer **writer_ptr) 761 { 762 int result; 763 struct chapter_writer *writer; 764 size_t collated_records_size = 765 (sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter); 766 767 result = vdo_allocate_extended(index->zone_count, chapters, "Chapter Writer", &writer); 768 if (result != VDO_SUCCESS) 769 return result; 770 771 writer->index = index; 772 mutex_init(&writer->mutex); 773 uds_init_cond(&writer->cond); 774 775 result = vdo_allocate_cache_aligned(collated_records_size, "collated records", 776 &writer->collated_records); 777 if (result != VDO_SUCCESS) { 778 free_chapter_writer(writer); 779 return result; 780 } 781 782 result = uds_make_open_chapter_index(&writer->open_chapter_index, 783 index->volume->geometry, 784 index->volume->nonce); 785 if (result != UDS_SUCCESS) { 786 free_chapter_writer(writer); 787 return result; 788 } 789 790 writer->memory_size = (sizeof(struct chapter_writer) + 791 index->zone_count * sizeof(struct open_chapter_zone *) + 792 collated_records_size + 793 writer->open_chapter_index->memory_size); 794 795 result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread); 796 if (result != VDO_SUCCESS) { 797 free_chapter_writer(writer); 798 return result; 799 } 800 801 *writer_ptr = writer; 802 return UDS_SUCCESS; 803 } 804 805 static int load_index(struct uds_index *index) 806 { 807 int result; 808 u64 last_save_chapter; 809 810 result = uds_load_index_state(index->layout, index); 811 if (result != UDS_SUCCESS) 812 return UDS_INDEX_NOT_SAVED_CLEANLY; 813 814 last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0); 815 816 vdo_log_info("loaded index from chapter %llu through chapter %llu", 817 (unsigned long long) index->oldest_virtual_chapter, 818 (unsigned long long) last_save_chapter); 819 820 return UDS_SUCCESS; 821 } 822 823 static int rebuild_index_page_map(struct uds_index *index, u64 vcn) 824 { 825 int result; 826 struct delta_index_page *chapter_index_page; 827 struct index_geometry *geometry = index->volume->geometry; 828 u32 chapter = uds_map_to_physical_chapter(geometry, vcn); 829 u32 expected_list_number = 0; 830 u32 index_page_number; 831 u32 lowest_delta_list; 832 u32 highest_delta_list; 833 834 for (index_page_number = 0; 835 index_page_number < geometry->index_pages_per_chapter; 836 index_page_number++) { 837 result = uds_get_volume_index_page(index->volume, chapter, 838 index_page_number, 839 &chapter_index_page); 840 if (result != UDS_SUCCESS) { 841 return vdo_log_error_strerror(result, 842 "failed to read index page %u in chapter %u", 843 index_page_number, chapter); 844 } 845 846 lowest_delta_list = chapter_index_page->lowest_list_number; 847 highest_delta_list = chapter_index_page->highest_list_number; 848 if (lowest_delta_list != expected_list_number) { 849 return vdo_log_error_strerror(UDS_CORRUPT_DATA, 850 "chapter %u index page %u is corrupt", 851 chapter, index_page_number); 852 } 853 854 uds_update_index_page_map(index->volume->index_page_map, vcn, chapter, 855 index_page_number, highest_delta_list); 856 expected_list_number = highest_delta_list + 1; 857 } 858 859 return UDS_SUCCESS; 860 } 861 862 static int replay_record(struct uds_index *index, const struct uds_record_name *name, 863 u64 virtual_chapter, bool will_be_sparse_chapter) 864 { 865 int result; 866 struct volume_index_record record; 867 bool update_record; 868 869 if (will_be_sparse_chapter && 870 !uds_is_volume_index_sample(index->volume_index, name)) { 871 /* 872 * This entry will be in a sparse chapter after the rebuild completes, and it is 873 * not a sample, so just skip over it. 874 */ 875 return UDS_SUCCESS; 876 } 877 878 result = uds_get_volume_index_record(index->volume_index, name, &record); 879 if (result != UDS_SUCCESS) 880 return result; 881 882 if (record.is_found) { 883 if (record.is_collision) { 884 if (record.virtual_chapter == virtual_chapter) { 885 /* The record is already correct. */ 886 return UDS_SUCCESS; 887 } 888 889 update_record = true; 890 } else if (record.virtual_chapter == virtual_chapter) { 891 /* 892 * There is a volume index entry pointing to the current chapter, but we 893 * don't know if it is for the same name as the one we are currently 894 * working on or not. For now, we're just going to assume that it isn't. 895 * This will create one extra collision record if there was a deleted 896 * record in the current chapter. 897 */ 898 update_record = false; 899 } else { 900 /* 901 * If we're rebuilding, we don't normally want to go to disk to see if the 902 * record exists, since we will likely have just read the record from disk 903 * (i.e. we know it's there). The exception to this is when we find an 904 * entry in the volume index that has a different chapter. In this case, we 905 * need to search that chapter to determine if the volume index entry was 906 * for the same record or a different one. 907 */ 908 result = uds_search_volume_page_cache_for_rebuild(index->volume, 909 name, 910 record.virtual_chapter, 911 &update_record); 912 if (result != UDS_SUCCESS) 913 return result; 914 } 915 } else { 916 update_record = false; 917 } 918 919 if (update_record) { 920 /* 921 * Update the volume index to reference the new chapter for the block. If the 922 * record had been deleted or dropped from the chapter index, it will be back. 923 */ 924 result = uds_set_volume_index_record_chapter(&record, virtual_chapter); 925 } else { 926 /* 927 * Add a new entry to the volume index referencing the open chapter. This should be 928 * done regardless of whether we are a brand new record or a sparse record, i.e. 929 * one that doesn't exist in the index but does on disk, since for a sparse record, 930 * we would want to un-sparsify if it did exist. 931 */ 932 result = uds_put_volume_index_record(&record, virtual_chapter); 933 } 934 935 if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) { 936 /* The rebuilt index will lose these records. */ 937 return UDS_SUCCESS; 938 } 939 940 return result; 941 } 942 943 static bool check_for_suspend(struct uds_index *index) 944 { 945 bool closing; 946 947 if (index->load_context == NULL) 948 return false; 949 950 mutex_lock(&index->load_context->mutex); 951 if (index->load_context->status != INDEX_SUSPENDING) { 952 mutex_unlock(&index->load_context->mutex); 953 return false; 954 } 955 956 /* Notify that we are suspended and wait for the resume. */ 957 index->load_context->status = INDEX_SUSPENDED; 958 uds_broadcast_cond(&index->load_context->cond); 959 960 while ((index->load_context->status != INDEX_OPENING) && 961 (index->load_context->status != INDEX_FREEING)) 962 uds_wait_cond(&index->load_context->cond, &index->load_context->mutex); 963 964 closing = (index->load_context->status == INDEX_FREEING); 965 mutex_unlock(&index->load_context->mutex); 966 return closing; 967 } 968 969 static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse) 970 { 971 int result; 972 u32 i; 973 u32 j; 974 const struct index_geometry *geometry; 975 u32 physical_chapter; 976 977 if (check_for_suspend(index)) { 978 vdo_log_info("Replay interrupted by index shutdown at chapter %llu", 979 (unsigned long long) virtual); 980 return -EBUSY; 981 } 982 983 geometry = index->volume->geometry; 984 physical_chapter = uds_map_to_physical_chapter(geometry, virtual); 985 uds_prefetch_volume_chapter(index->volume, physical_chapter); 986 uds_set_volume_index_open_chapter(index->volume_index, virtual); 987 988 result = rebuild_index_page_map(index, virtual); 989 if (result != UDS_SUCCESS) { 990 return vdo_log_error_strerror(result, 991 "could not rebuild index page map for chapter %u", 992 physical_chapter); 993 } 994 995 for (i = 0; i < geometry->record_pages_per_chapter; i++) { 996 u8 *record_page; 997 u32 record_page_number; 998 999 record_page_number = geometry->index_pages_per_chapter + i; 1000 result = uds_get_volume_record_page(index->volume, physical_chapter, 1001 record_page_number, &record_page); 1002 if (result != UDS_SUCCESS) { 1003 return vdo_log_error_strerror(result, "could not get page %d", 1004 record_page_number); 1005 } 1006 1007 for (j = 0; j < geometry->records_per_page; j++) { 1008 const u8 *name_bytes; 1009 struct uds_record_name name; 1010 1011 name_bytes = record_page + (j * BYTES_PER_RECORD); 1012 memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE); 1013 result = replay_record(index, &name, virtual, sparse); 1014 if (result != UDS_SUCCESS) 1015 return result; 1016 } 1017 } 1018 1019 return UDS_SUCCESS; 1020 } 1021 1022 static int replay_volume(struct uds_index *index) 1023 { 1024 int result; 1025 u64 old_map_update; 1026 u64 new_map_update; 1027 u64 virtual; 1028 u64 from_virtual = index->oldest_virtual_chapter; 1029 u64 upto_virtual = index->newest_virtual_chapter; 1030 bool will_be_sparse; 1031 1032 vdo_log_info("Replaying volume from chapter %llu through chapter %llu", 1033 (unsigned long long) from_virtual, 1034 (unsigned long long) upto_virtual); 1035 1036 /* 1037 * The index failed to load, so the volume index is empty. Add records to the volume index 1038 * in order, skipping non-hooks in chapters which will be sparse to save time. 1039 * 1040 * Go through each record page of each chapter and add the records back to the volume 1041 * index. This should not cause anything to be written to either the open chapter or the 1042 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this 1043 * would have already been purged from the volume index when the chapter was opened. 1044 * 1045 * Also, go through each index page for each chapter and rebuild the index page map. 1046 */ 1047 old_map_update = index->volume->index_page_map->last_update; 1048 for (virtual = from_virtual; virtual < upto_virtual; virtual++) { 1049 will_be_sparse = uds_is_chapter_sparse(index->volume->geometry, 1050 from_virtual, upto_virtual, 1051 virtual); 1052 result = replay_chapter(index, virtual, will_be_sparse); 1053 if (result != UDS_SUCCESS) 1054 return result; 1055 } 1056 1057 /* Also reap the chapter being replaced by the open chapter. */ 1058 uds_set_volume_index_open_chapter(index->volume_index, upto_virtual); 1059 1060 new_map_update = index->volume->index_page_map->last_update; 1061 if (new_map_update != old_map_update) { 1062 vdo_log_info("replay changed index page map update from %llu to %llu", 1063 (unsigned long long) old_map_update, 1064 (unsigned long long) new_map_update); 1065 } 1066 1067 return UDS_SUCCESS; 1068 } 1069 1070 static int rebuild_index(struct uds_index *index) 1071 { 1072 int result; 1073 u64 lowest; 1074 u64 highest; 1075 bool is_empty = false; 1076 u32 chapters_per_volume = index->volume->geometry->chapters_per_volume; 1077 1078 index->volume->lookup_mode = LOOKUP_FOR_REBUILD; 1079 result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest, 1080 &is_empty); 1081 if (result != UDS_SUCCESS) { 1082 return vdo_log_fatal_strerror(result, 1083 "cannot rebuild index: unknown volume chapter boundaries"); 1084 } 1085 1086 if (is_empty) { 1087 index->newest_virtual_chapter = 0; 1088 index->oldest_virtual_chapter = 0; 1089 index->volume->lookup_mode = LOOKUP_NORMAL; 1090 return UDS_SUCCESS; 1091 } 1092 1093 index->newest_virtual_chapter = highest + 1; 1094 index->oldest_virtual_chapter = lowest; 1095 if (index->newest_virtual_chapter == 1096 (index->oldest_virtual_chapter + chapters_per_volume)) { 1097 /* Skip the chapter shadowed by the open chapter. */ 1098 index->oldest_virtual_chapter++; 1099 } 1100 1101 result = replay_volume(index); 1102 if (result != UDS_SUCCESS) 1103 return result; 1104 1105 index->volume->lookup_mode = LOOKUP_NORMAL; 1106 return UDS_SUCCESS; 1107 } 1108 1109 static void free_index_zone(struct index_zone *zone) 1110 { 1111 if (zone == NULL) 1112 return; 1113 1114 uds_free_open_chapter(zone->open_chapter); 1115 uds_free_open_chapter(zone->writing_chapter); 1116 vdo_free(zone); 1117 } 1118 1119 static int make_index_zone(struct uds_index *index, unsigned int zone_number) 1120 { 1121 int result; 1122 struct index_zone *zone; 1123 1124 result = vdo_allocate(1, "index zone", &zone); 1125 if (result != VDO_SUCCESS) 1126 return result; 1127 1128 result = uds_make_open_chapter(index->volume->geometry, index->zone_count, 1129 &zone->open_chapter); 1130 if (result != UDS_SUCCESS) { 1131 free_index_zone(zone); 1132 return result; 1133 } 1134 1135 result = uds_make_open_chapter(index->volume->geometry, index->zone_count, 1136 &zone->writing_chapter); 1137 if (result != UDS_SUCCESS) { 1138 free_index_zone(zone); 1139 return result; 1140 } 1141 1142 zone->index = index; 1143 zone->id = zone_number; 1144 index->zones[zone_number] = zone; 1145 1146 return UDS_SUCCESS; 1147 } 1148 1149 int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type, 1150 struct index_load_context *load_context, index_callback_fn callback, 1151 struct uds_index **new_index) 1152 { 1153 int result; 1154 bool loaded = false; 1155 bool new = (open_type == UDS_CREATE); 1156 struct uds_index *index = NULL; 1157 struct index_zone *zone; 1158 u64 nonce; 1159 unsigned int z; 1160 1161 result = vdo_allocate_extended(config->zone_count, zone_queues, "index", &index); 1162 if (result != VDO_SUCCESS) 1163 return result; 1164 1165 index->zone_count = config->zone_count; 1166 1167 result = uds_make_index_layout(config, new, &index->layout); 1168 if (result != UDS_SUCCESS) { 1169 uds_free_index(index); 1170 return result; 1171 } 1172 1173 result = vdo_allocate(index->zone_count, "zones", &index->zones); 1174 if (result != VDO_SUCCESS) { 1175 uds_free_index(index); 1176 return result; 1177 } 1178 1179 result = uds_make_volume(config, index->layout, &index->volume); 1180 if (result != UDS_SUCCESS) { 1181 uds_free_index(index); 1182 return result; 1183 } 1184 1185 index->volume->lookup_mode = LOOKUP_NORMAL; 1186 for (z = 0; z < index->zone_count; z++) { 1187 result = make_index_zone(index, z); 1188 if (result != UDS_SUCCESS) { 1189 uds_free_index(index); 1190 return vdo_log_error_strerror(result, 1191 "Could not create index zone"); 1192 } 1193 } 1194 1195 nonce = uds_get_volume_nonce(index->layout); 1196 result = uds_make_volume_index(config, nonce, &index->volume_index); 1197 if (result != UDS_SUCCESS) { 1198 uds_free_index(index); 1199 return vdo_log_error_strerror(result, "could not make volume index"); 1200 } 1201 1202 index->load_context = load_context; 1203 index->callback = callback; 1204 1205 result = initialize_index_queues(index, config->geometry); 1206 if (result != UDS_SUCCESS) { 1207 uds_free_index(index); 1208 return result; 1209 } 1210 1211 result = make_chapter_writer(index, &index->chapter_writer); 1212 if (result != UDS_SUCCESS) { 1213 uds_free_index(index); 1214 return result; 1215 } 1216 1217 if (!new) { 1218 result = load_index(index); 1219 switch (result) { 1220 case UDS_SUCCESS: 1221 loaded = true; 1222 break; 1223 case -ENOMEM: 1224 /* We should not try a rebuild for this error. */ 1225 vdo_log_error_strerror(result, "index could not be loaded"); 1226 break; 1227 default: 1228 vdo_log_error_strerror(result, "index could not be loaded"); 1229 if (open_type == UDS_LOAD) { 1230 result = rebuild_index(index); 1231 if (result != UDS_SUCCESS) { 1232 vdo_log_error_strerror(result, 1233 "index could not be rebuilt"); 1234 } 1235 } 1236 break; 1237 } 1238 } 1239 1240 if (result != UDS_SUCCESS) { 1241 uds_free_index(index); 1242 return vdo_log_error_strerror(result, "fatal error in %s()", __func__); 1243 } 1244 1245 for (z = 0; z < index->zone_count; z++) { 1246 zone = index->zones[z]; 1247 zone->oldest_virtual_chapter = index->oldest_virtual_chapter; 1248 zone->newest_virtual_chapter = index->newest_virtual_chapter; 1249 } 1250 1251 if (index->load_context != NULL) { 1252 mutex_lock(&index->load_context->mutex); 1253 index->load_context->status = INDEX_READY; 1254 /* 1255 * If we get here, suspend is meaningless, but notify any thread trying to suspend 1256 * us so it doesn't hang. 1257 */ 1258 uds_broadcast_cond(&index->load_context->cond); 1259 mutex_unlock(&index->load_context->mutex); 1260 } 1261 1262 index->has_saved_open_chapter = loaded; 1263 index->need_to_save = !loaded; 1264 *new_index = index; 1265 return UDS_SUCCESS; 1266 } 1267 1268 void uds_free_index(struct uds_index *index) 1269 { 1270 unsigned int i; 1271 1272 if (index == NULL) 1273 return; 1274 1275 uds_request_queue_finish(index->triage_queue); 1276 for (i = 0; i < index->zone_count; i++) 1277 uds_request_queue_finish(index->zone_queues[i]); 1278 1279 free_chapter_writer(index->chapter_writer); 1280 1281 uds_free_volume_index(index->volume_index); 1282 if (index->zones != NULL) { 1283 for (i = 0; i < index->zone_count; i++) 1284 free_index_zone(index->zones[i]); 1285 vdo_free(index->zones); 1286 } 1287 1288 uds_free_volume(index->volume); 1289 uds_free_index_layout(vdo_forget(index->layout)); 1290 vdo_free(index); 1291 } 1292 1293 /* Wait for the chapter writer to complete any outstanding writes. */ 1294 void uds_wait_for_idle_index(struct uds_index *index) 1295 { 1296 struct chapter_writer *writer = index->chapter_writer; 1297 1298 mutex_lock(&writer->mutex); 1299 while (writer->zones_to_write > 0) 1300 uds_wait_cond(&writer->cond, &writer->mutex); 1301 mutex_unlock(&writer->mutex); 1302 } 1303 1304 /* This function assumes that all requests have been drained. */ 1305 int uds_save_index(struct uds_index *index) 1306 { 1307 int result; 1308 1309 if (!index->need_to_save) 1310 return UDS_SUCCESS; 1311 1312 uds_wait_for_idle_index(index); 1313 index->prev_save = index->last_save; 1314 index->last_save = ((index->newest_virtual_chapter == 0) ? 1315 NO_LAST_SAVE : index->newest_virtual_chapter - 1); 1316 vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save); 1317 1318 result = uds_save_index_state(index->layout, index); 1319 if (result != UDS_SUCCESS) { 1320 vdo_log_info("save index failed"); 1321 index->last_save = index->prev_save; 1322 } else { 1323 index->has_saved_open_chapter = true; 1324 index->need_to_save = false; 1325 vdo_log_info("finished save (vcn %llu)", 1326 (unsigned long long) index->last_save); 1327 } 1328 1329 return result; 1330 } 1331 1332 int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev) 1333 { 1334 return uds_replace_volume_storage(index->volume, index->layout, bdev); 1335 } 1336 1337 /* Accessing statistics should be safe from any thread. */ 1338 void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters) 1339 { 1340 struct volume_index_stats stats; 1341 1342 uds_get_volume_index_stats(index->volume_index, &stats); 1343 counters->entries_indexed = stats.record_count; 1344 counters->collisions = stats.collision_count; 1345 counters->entries_discarded = stats.discard_count; 1346 1347 counters->memory_used = (index->volume_index->memory_size + 1348 index->volume->cache_size + 1349 index->chapter_writer->memory_size); 1350 } 1351 1352 void uds_enqueue_request(struct uds_request *request, enum request_stage stage) 1353 { 1354 struct uds_index *index = request->index; 1355 struct uds_request_queue *queue; 1356 1357 switch (stage) { 1358 case STAGE_TRIAGE: 1359 if (index->triage_queue != NULL) { 1360 queue = index->triage_queue; 1361 break; 1362 } 1363 1364 fallthrough; 1365 1366 case STAGE_INDEX: 1367 request->zone_number = 1368 uds_get_volume_index_zone(index->volume_index, &request->record_name); 1369 fallthrough; 1370 1371 case STAGE_MESSAGE: 1372 queue = index->zone_queues[request->zone_number]; 1373 break; 1374 1375 default: 1376 VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage); 1377 return; 1378 } 1379 1380 uds_request_queue_enqueue(queue, request); 1381 } 1382