1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 7 #include "index.h" 8 9 #include "logger.h" 10 #include "memory-alloc.h" 11 12 #include "funnel-requestqueue.h" 13 #include "hash-utils.h" 14 #include "sparse-cache.h" 15 16 static const u64 NO_LAST_SAVE = U64_MAX; 17 18 /* 19 * When searching for deduplication records, the index first searches the volume index, and then 20 * searches the chapter index for the relevant chapter. If the chapter has been fully committed to 21 * storage, the chapter pages are loaded into the page cache. If the chapter has not yet been 22 * committed (either the open chapter or a recently closed one), the index searches the in-memory 23 * representation of the chapter. Finally, if the volume index does not find a record and the index 24 * is sparse, the index will search the sparse cache. 25 * 26 * The index send two kinds of messages to coordinate between zones: chapter close messages for the 27 * chapter writer, and sparse cache barrier messages for the sparse cache. 28 * 29 * The chapter writer is responsible for committing chapters of records to storage. Since zones can 30 * get different numbers of records, some zones may fall behind others. Each time a zone fills up 31 * its available space in a chapter, it informs the chapter writer that the chapter is complete, 32 * and also informs all other zones that it has closed the chapter. Each other zone will then close 33 * the chapter immediately, regardless of how full it is, in order to minimize skew between zones. 34 * Once every zone has closed the chapter, the chapter writer will commit that chapter to storage. 35 * 36 * The last zone to close the chapter also removes the oldest chapter from the volume index. 37 * Although that chapter is invalid for zones that have moved on, the existence of the open chapter 38 * means that those zones will never ask the volume index about it. No zone is allowed to get more 39 * than one chapter ahead of any other. If a zone is so far ahead that it tries to close another 40 * chapter before the previous one has been closed by all zones, it is forced to wait. 41 * 42 * The sparse cache relies on having the same set of chapter indexes available to all zones. When a 43 * request wants to add a chapter to the sparse cache, it sends a barrier message to each zone 44 * during the triage stage that acts as a rendezvous. Once every zone has reached the barrier and 45 * paused its operations, the cache membership is changed and each zone is then informed that it 46 * can proceed. More details can be found in the sparse cache documentation. 47 * 48 * If a sparse cache has only one zone, it will not create a triage queue, but it still needs the 49 * barrier message to change the sparse cache membership, so the index simulates the message by 50 * invoking the handler directly. 51 */ 52 53 struct chapter_writer { 54 /* The index to which we belong */ 55 struct uds_index *index; 56 /* The thread to do the writing */ 57 struct thread *thread; 58 /* The lock protecting the following fields */ 59 struct mutex mutex; 60 /* The condition signalled on state changes */ 61 struct cond_var cond; 62 /* Set to true to stop the thread */ 63 bool stop; 64 /* The result from the most recent write */ 65 int result; 66 /* The number of bytes allocated by the chapter writer */ 67 size_t memory_size; 68 /* The number of zones which have submitted a chapter for writing */ 69 unsigned int zones_to_write; 70 /* Open chapter index used by uds_close_open_chapter() */ 71 struct open_chapter_index *open_chapter_index; 72 /* Collated records used by uds_close_open_chapter() */ 73 struct uds_volume_record *collated_records; 74 /* The chapters to write (one per zone) */ 75 struct open_chapter_zone *chapters[]; 76 }; 77 78 static bool is_zone_chapter_sparse(const struct index_zone *zone, u64 virtual_chapter) 79 { 80 return uds_is_chapter_sparse(zone->index->volume->geometry, 81 zone->oldest_virtual_chapter, 82 zone->newest_virtual_chapter, virtual_chapter); 83 } 84 85 static int launch_zone_message(struct uds_zone_message message, unsigned int zone, 86 struct uds_index *index) 87 { 88 int result; 89 struct uds_request *request; 90 91 result = vdo_allocate(1, struct uds_request, __func__, &request); 92 if (result != VDO_SUCCESS) 93 return result; 94 95 request->index = index; 96 request->unbatched = true; 97 request->zone_number = zone; 98 request->zone_message = message; 99 100 uds_enqueue_request(request, STAGE_MESSAGE); 101 return UDS_SUCCESS; 102 } 103 104 static void enqueue_barrier_messages(struct uds_index *index, u64 virtual_chapter) 105 { 106 struct uds_zone_message message = { 107 .type = UDS_MESSAGE_SPARSE_CACHE_BARRIER, 108 .virtual_chapter = virtual_chapter, 109 }; 110 unsigned int zone; 111 112 for (zone = 0; zone < index->zone_count; zone++) { 113 int result = launch_zone_message(message, zone, index); 114 115 VDO_ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation"); 116 } 117 } 118 119 /* 120 * Determine whether this request should trigger a sparse cache barrier message to change the 121 * membership of the sparse cache. If a change in membership is desired, the function returns the 122 * chapter number to add. 123 */ 124 static u64 triage_index_request(struct uds_index *index, struct uds_request *request) 125 { 126 u64 virtual_chapter; 127 struct index_zone *zone; 128 129 virtual_chapter = uds_lookup_volume_index_name(index->volume_index, 130 &request->record_name); 131 if (virtual_chapter == NO_CHAPTER) 132 return NO_CHAPTER; 133 134 zone = index->zones[request->zone_number]; 135 if (!is_zone_chapter_sparse(zone, virtual_chapter)) 136 return NO_CHAPTER; 137 138 /* 139 * FIXME: Optimize for a common case by remembering the chapter from the most recent 140 * barrier message and skipping this chapter if is it the same. 141 */ 142 143 return virtual_chapter; 144 } 145 146 /* 147 * Simulate a message to change the sparse cache membership for a single-zone sparse index. This 148 * allows us to forgo the complicated locking required by a multi-zone sparse index. Any other kind 149 * of index does nothing here. 150 */ 151 static int simulate_index_zone_barrier_message(struct index_zone *zone, 152 struct uds_request *request) 153 { 154 u64 sparse_virtual_chapter; 155 156 if ((zone->index->zone_count > 1) || 157 !uds_is_sparse_index_geometry(zone->index->volume->geometry)) 158 return UDS_SUCCESS; 159 160 sparse_virtual_chapter = triage_index_request(zone->index, request); 161 if (sparse_virtual_chapter == NO_CHAPTER) 162 return UDS_SUCCESS; 163 164 return uds_update_sparse_cache(zone, sparse_virtual_chapter); 165 } 166 167 /* This is the request processing function for the triage queue. */ 168 static void triage_request(struct uds_request *request) 169 { 170 struct uds_index *index = request->index; 171 u64 sparse_virtual_chapter = triage_index_request(index, request); 172 173 if (sparse_virtual_chapter != NO_CHAPTER) 174 enqueue_barrier_messages(index, sparse_virtual_chapter); 175 176 uds_enqueue_request(request, STAGE_INDEX); 177 } 178 179 static int finish_previous_chapter(struct uds_index *index, u64 current_chapter_number) 180 { 181 int result; 182 struct chapter_writer *writer = index->chapter_writer; 183 184 mutex_lock(&writer->mutex); 185 while (index->newest_virtual_chapter < current_chapter_number) 186 uds_wait_cond(&writer->cond, &writer->mutex); 187 result = writer->result; 188 mutex_unlock(&writer->mutex); 189 190 if (result != UDS_SUCCESS) 191 return vdo_log_error_strerror(result, 192 "Writing of previous open chapter failed"); 193 194 return UDS_SUCCESS; 195 } 196 197 static int swap_open_chapter(struct index_zone *zone) 198 { 199 int result; 200 201 result = finish_previous_chapter(zone->index, zone->newest_virtual_chapter); 202 if (result != UDS_SUCCESS) 203 return result; 204 205 swap(zone->open_chapter, zone->writing_chapter); 206 return UDS_SUCCESS; 207 } 208 209 /* 210 * Inform the chapter writer that this zone is done with this chapter. The chapter won't start 211 * writing until all zones have closed it. 212 */ 213 static unsigned int start_closing_chapter(struct uds_index *index, 214 unsigned int zone_number, 215 struct open_chapter_zone *chapter) 216 { 217 unsigned int finished_zones; 218 struct chapter_writer *writer = index->chapter_writer; 219 220 mutex_lock(&writer->mutex); 221 finished_zones = ++writer->zones_to_write; 222 writer->chapters[zone_number] = chapter; 223 uds_broadcast_cond(&writer->cond); 224 mutex_unlock(&writer->mutex); 225 226 return finished_zones; 227 } 228 229 static int announce_chapter_closed(struct index_zone *zone, u64 closed_chapter) 230 { 231 int result; 232 unsigned int i; 233 struct uds_zone_message zone_message = { 234 .type = UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED, 235 .virtual_chapter = closed_chapter, 236 }; 237 238 for (i = 0; i < zone->index->zone_count; i++) { 239 if (zone->id == i) 240 continue; 241 242 result = launch_zone_message(zone_message, i, zone->index); 243 if (result != UDS_SUCCESS) 244 return result; 245 } 246 247 return UDS_SUCCESS; 248 } 249 250 static int open_next_chapter(struct index_zone *zone) 251 { 252 int result; 253 u64 closed_chapter; 254 u64 expiring; 255 unsigned int finished_zones; 256 u32 expire_chapters; 257 258 vdo_log_debug("closing chapter %llu of zone %u after %u entries (%u short)", 259 (unsigned long long) zone->newest_virtual_chapter, zone->id, 260 zone->open_chapter->size, 261 zone->open_chapter->capacity - zone->open_chapter->size); 262 263 result = swap_open_chapter(zone); 264 if (result != UDS_SUCCESS) 265 return result; 266 267 closed_chapter = zone->newest_virtual_chapter++; 268 uds_set_volume_index_zone_open_chapter(zone->index->volume_index, zone->id, 269 zone->newest_virtual_chapter); 270 uds_reset_open_chapter(zone->open_chapter); 271 272 finished_zones = start_closing_chapter(zone->index, zone->id, 273 zone->writing_chapter); 274 if ((finished_zones == 1) && (zone->index->zone_count > 1)) { 275 result = announce_chapter_closed(zone, closed_chapter); 276 if (result != UDS_SUCCESS) 277 return result; 278 } 279 280 expiring = zone->oldest_virtual_chapter; 281 expire_chapters = uds_chapters_to_expire(zone->index->volume->geometry, 282 zone->newest_virtual_chapter); 283 zone->oldest_virtual_chapter += expire_chapters; 284 285 if (finished_zones < zone->index->zone_count) 286 return UDS_SUCCESS; 287 288 while (expire_chapters-- > 0) 289 uds_forget_chapter(zone->index->volume, expiring++); 290 291 return UDS_SUCCESS; 292 } 293 294 static int handle_chapter_closed(struct index_zone *zone, u64 virtual_chapter) 295 { 296 if (zone->newest_virtual_chapter == virtual_chapter) 297 return open_next_chapter(zone); 298 299 return UDS_SUCCESS; 300 } 301 302 static int dispatch_index_zone_control_request(struct uds_request *request) 303 { 304 struct uds_zone_message *message = &request->zone_message; 305 struct index_zone *zone = request->index->zones[request->zone_number]; 306 307 switch (message->type) { 308 case UDS_MESSAGE_SPARSE_CACHE_BARRIER: 309 return uds_update_sparse_cache(zone, message->virtual_chapter); 310 311 case UDS_MESSAGE_ANNOUNCE_CHAPTER_CLOSED: 312 return handle_chapter_closed(zone, message->virtual_chapter); 313 314 default: 315 vdo_log_error("invalid message type: %d", message->type); 316 return UDS_INVALID_ARGUMENT; 317 } 318 } 319 320 static void set_request_location(struct uds_request *request, 321 enum uds_index_region new_location) 322 { 323 request->location = new_location; 324 request->found = ((new_location == UDS_LOCATION_IN_OPEN_CHAPTER) || 325 (new_location == UDS_LOCATION_IN_DENSE) || 326 (new_location == UDS_LOCATION_IN_SPARSE)); 327 } 328 329 static void set_chapter_location(struct uds_request *request, 330 const struct index_zone *zone, u64 virtual_chapter) 331 { 332 request->found = true; 333 if (virtual_chapter == zone->newest_virtual_chapter) 334 request->location = UDS_LOCATION_IN_OPEN_CHAPTER; 335 else if (is_zone_chapter_sparse(zone, virtual_chapter)) 336 request->location = UDS_LOCATION_IN_SPARSE; 337 else 338 request->location = UDS_LOCATION_IN_DENSE; 339 } 340 341 static int search_sparse_cache_in_zone(struct index_zone *zone, struct uds_request *request, 342 u64 virtual_chapter, bool *found) 343 { 344 int result; 345 struct volume *volume; 346 u16 record_page_number; 347 u32 chapter; 348 349 result = uds_search_sparse_cache(zone, &request->record_name, &virtual_chapter, 350 &record_page_number); 351 if ((result != UDS_SUCCESS) || (virtual_chapter == NO_CHAPTER)) 352 return result; 353 354 request->virtual_chapter = virtual_chapter; 355 volume = zone->index->volume; 356 chapter = uds_map_to_physical_chapter(volume->geometry, virtual_chapter); 357 return uds_search_cached_record_page(volume, request, chapter, 358 record_page_number, found); 359 } 360 361 static int get_record_from_zone(struct index_zone *zone, struct uds_request *request, 362 bool *found) 363 { 364 struct volume *volume; 365 366 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) { 367 *found = true; 368 return UDS_SUCCESS; 369 } else if (request->location == UDS_LOCATION_UNAVAILABLE) { 370 *found = false; 371 return UDS_SUCCESS; 372 } 373 374 if (request->virtual_chapter == zone->newest_virtual_chapter) { 375 uds_search_open_chapter(zone->open_chapter, &request->record_name, 376 &request->old_metadata, found); 377 return UDS_SUCCESS; 378 } 379 380 if ((zone->newest_virtual_chapter > 0) && 381 (request->virtual_chapter == (zone->newest_virtual_chapter - 1)) && 382 (zone->writing_chapter->size > 0)) { 383 uds_search_open_chapter(zone->writing_chapter, &request->record_name, 384 &request->old_metadata, found); 385 return UDS_SUCCESS; 386 } 387 388 volume = zone->index->volume; 389 if (is_zone_chapter_sparse(zone, request->virtual_chapter) && 390 uds_sparse_cache_contains(volume->sparse_cache, request->virtual_chapter, 391 request->zone_number)) 392 return search_sparse_cache_in_zone(zone, request, 393 request->virtual_chapter, found); 394 395 return uds_search_volume_page_cache(volume, request, found); 396 } 397 398 static int put_record_in_zone(struct index_zone *zone, struct uds_request *request, 399 const struct uds_record_data *metadata) 400 { 401 unsigned int remaining; 402 403 remaining = uds_put_open_chapter(zone->open_chapter, &request->record_name, 404 metadata); 405 if (remaining == 0) 406 return open_next_chapter(zone); 407 408 return UDS_SUCCESS; 409 } 410 411 static int search_index_zone(struct index_zone *zone, struct uds_request *request) 412 { 413 int result; 414 struct volume_index_record record; 415 bool overflow_record, found = false; 416 struct uds_record_data *metadata; 417 u64 chapter; 418 419 result = uds_get_volume_index_record(zone->index->volume_index, 420 &request->record_name, &record); 421 if (result != UDS_SUCCESS) 422 return result; 423 424 if (record.is_found) { 425 if (request->requeued && request->virtual_chapter != record.virtual_chapter) 426 set_request_location(request, UDS_LOCATION_UNKNOWN); 427 428 request->virtual_chapter = record.virtual_chapter; 429 result = get_record_from_zone(zone, request, &found); 430 if (result != UDS_SUCCESS) 431 return result; 432 } 433 434 if (found) 435 set_chapter_location(request, zone, record.virtual_chapter); 436 437 /* 438 * If a record has overflowed a chapter index in more than one chapter (or overflowed in 439 * one chapter and collided with an existing record), it will exist as a collision record 440 * in the volume index, but we won't find it in the volume. This case needs special 441 * handling. 442 */ 443 overflow_record = (record.is_found && record.is_collision && !found); 444 chapter = zone->newest_virtual_chapter; 445 if (found || overflow_record) { 446 if ((request->type == UDS_QUERY_NO_UPDATE) || 447 ((request->type == UDS_QUERY) && overflow_record)) { 448 /* There is nothing left to do. */ 449 return UDS_SUCCESS; 450 } 451 452 if (record.virtual_chapter != chapter) { 453 /* 454 * Update the volume index to reference the new chapter for the block. If 455 * the record had been deleted or dropped from the chapter index, it will 456 * be back. 457 */ 458 result = uds_set_volume_index_record_chapter(&record, chapter); 459 } else if (request->type != UDS_UPDATE) { 460 /* The record is already in the open chapter. */ 461 return UDS_SUCCESS; 462 } 463 } else { 464 /* 465 * The record wasn't in the volume index, so check whether the 466 * name is in a cached sparse chapter. If we found the name on 467 * a previous search, use that result instead. 468 */ 469 if (request->location == UDS_LOCATION_RECORD_PAGE_LOOKUP) { 470 found = true; 471 } else if (request->location == UDS_LOCATION_UNAVAILABLE) { 472 found = false; 473 } else if (uds_is_sparse_index_geometry(zone->index->volume->geometry) && 474 !uds_is_volume_index_sample(zone->index->volume_index, 475 &request->record_name)) { 476 result = search_sparse_cache_in_zone(zone, request, NO_CHAPTER, 477 &found); 478 if (result != UDS_SUCCESS) 479 return result; 480 } 481 482 if (found) 483 set_request_location(request, UDS_LOCATION_IN_SPARSE); 484 485 if ((request->type == UDS_QUERY_NO_UPDATE) || 486 ((request->type == UDS_QUERY) && !found)) { 487 /* There is nothing left to do. */ 488 return UDS_SUCCESS; 489 } 490 491 /* 492 * Add a new entry to the volume index referencing the open chapter. This needs to 493 * be done both for new records, and for records from cached sparse chapters. 494 */ 495 result = uds_put_volume_index_record(&record, chapter); 496 } 497 498 if (result == UDS_OVERFLOW) { 499 /* 500 * The volume index encountered a delta list overflow. The condition was already 501 * logged. We will go on without adding the record to the open chapter. 502 */ 503 return UDS_SUCCESS; 504 } 505 506 if (result != UDS_SUCCESS) 507 return result; 508 509 if (!found || (request->type == UDS_UPDATE)) { 510 /* This is a new record or we're updating an existing record. */ 511 metadata = &request->new_metadata; 512 } else { 513 /* Move the existing record to the open chapter. */ 514 metadata = &request->old_metadata; 515 } 516 517 return put_record_in_zone(zone, request, metadata); 518 } 519 520 static int remove_from_index_zone(struct index_zone *zone, struct uds_request *request) 521 { 522 int result; 523 struct volume_index_record record; 524 525 result = uds_get_volume_index_record(zone->index->volume_index, 526 &request->record_name, &record); 527 if (result != UDS_SUCCESS) 528 return result; 529 530 if (!record.is_found) 531 return UDS_SUCCESS; 532 533 /* If the request was requeued, check whether the saved state is still valid. */ 534 535 if (record.is_collision) { 536 set_chapter_location(request, zone, record.virtual_chapter); 537 } else { 538 /* Non-collision records are hints, so resolve the name in the chapter. */ 539 bool found; 540 541 if (request->requeued && request->virtual_chapter != record.virtual_chapter) 542 set_request_location(request, UDS_LOCATION_UNKNOWN); 543 544 request->virtual_chapter = record.virtual_chapter; 545 result = get_record_from_zone(zone, request, &found); 546 if (result != UDS_SUCCESS) 547 return result; 548 549 if (!found) { 550 /* There is no record to remove. */ 551 return UDS_SUCCESS; 552 } 553 } 554 555 set_chapter_location(request, zone, record.virtual_chapter); 556 557 /* 558 * Delete the volume index entry for the named record only. Note that a later search might 559 * later return stale advice if there is a colliding name in the same chapter, but it's a 560 * very rare case (1 in 2^21). 561 */ 562 result = uds_remove_volume_index_record(&record); 563 if (result != UDS_SUCCESS) 564 return result; 565 566 /* 567 * If the record is in the open chapter, we must remove it or mark it deleted to avoid 568 * trouble if the record is added again later. 569 */ 570 if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER) 571 uds_remove_from_open_chapter(zone->open_chapter, &request->record_name); 572 573 return UDS_SUCCESS; 574 } 575 576 static int dispatch_index_request(struct uds_index *index, struct uds_request *request) 577 { 578 int result; 579 struct index_zone *zone = index->zones[request->zone_number]; 580 581 if (!request->requeued) { 582 result = simulate_index_zone_barrier_message(zone, request); 583 if (result != UDS_SUCCESS) 584 return result; 585 } 586 587 switch (request->type) { 588 case UDS_POST: 589 case UDS_UPDATE: 590 case UDS_QUERY: 591 case UDS_QUERY_NO_UPDATE: 592 result = search_index_zone(zone, request); 593 break; 594 595 case UDS_DELETE: 596 result = remove_from_index_zone(zone, request); 597 break; 598 599 default: 600 result = vdo_log_warning_strerror(UDS_INVALID_ARGUMENT, 601 "invalid request type: %d", 602 request->type); 603 break; 604 } 605 606 return result; 607 } 608 609 /* This is the request processing function invoked by each zone's thread. */ 610 static void execute_zone_request(struct uds_request *request) 611 { 612 int result; 613 struct uds_index *index = request->index; 614 615 if (request->zone_message.type != UDS_MESSAGE_NONE) { 616 result = dispatch_index_zone_control_request(request); 617 if (result != UDS_SUCCESS) { 618 vdo_log_error_strerror(result, "error executing message: %d", 619 request->zone_message.type); 620 } 621 622 /* Once the message is processed it can be freed. */ 623 vdo_free(vdo_forget(request)); 624 return; 625 } 626 627 index->need_to_save = true; 628 if (request->requeued && (request->status != UDS_SUCCESS)) { 629 set_request_location(request, UDS_LOCATION_UNAVAILABLE); 630 index->callback(request); 631 return; 632 } 633 634 result = dispatch_index_request(index, request); 635 if (result == UDS_QUEUED) { 636 /* The request has been requeued so don't let it complete. */ 637 return; 638 } 639 640 if (!request->found) 641 set_request_location(request, UDS_LOCATION_UNAVAILABLE); 642 643 request->status = result; 644 index->callback(request); 645 } 646 647 static int initialize_index_queues(struct uds_index *index, 648 const struct index_geometry *geometry) 649 { 650 int result; 651 unsigned int i; 652 653 for (i = 0; i < index->zone_count; i++) { 654 result = uds_make_request_queue("indexW", &execute_zone_request, 655 &index->zone_queues[i]); 656 if (result != UDS_SUCCESS) 657 return result; 658 } 659 660 /* The triage queue is only needed for sparse multi-zone indexes. */ 661 if ((index->zone_count > 1) && uds_is_sparse_index_geometry(geometry)) { 662 result = uds_make_request_queue("triageW", &triage_request, 663 &index->triage_queue); 664 if (result != UDS_SUCCESS) 665 return result; 666 } 667 668 return UDS_SUCCESS; 669 } 670 671 /* This is the driver function for the chapter writer thread. */ 672 static void close_chapters(void *arg) 673 { 674 int result; 675 struct chapter_writer *writer = arg; 676 struct uds_index *index = writer->index; 677 678 vdo_log_debug("chapter writer starting"); 679 mutex_lock(&writer->mutex); 680 for (;;) { 681 while (writer->zones_to_write < index->zone_count) { 682 if (writer->stop && (writer->zones_to_write == 0)) { 683 /* 684 * We've been told to stop, and all of the zones are in the same 685 * open chapter, so we can exit now. 686 */ 687 mutex_unlock(&writer->mutex); 688 vdo_log_debug("chapter writer stopping"); 689 return; 690 } 691 uds_wait_cond(&writer->cond, &writer->mutex); 692 } 693 694 /* 695 * Release the lock while closing a chapter. We probably don't need to do this, but 696 * it seems safer in principle. It's OK to access the chapter and chapter_number 697 * fields without the lock since those aren't allowed to change until we're done. 698 */ 699 mutex_unlock(&writer->mutex); 700 701 if (index->has_saved_open_chapter) { 702 /* 703 * Remove the saved open chapter the first time we close an open chapter 704 * after loading from a clean shutdown, or after doing a clean save. The 705 * lack of the saved open chapter will indicate that a recovery is 706 * necessary. 707 */ 708 index->has_saved_open_chapter = false; 709 result = uds_discard_open_chapter(index->layout); 710 if (result == UDS_SUCCESS) 711 vdo_log_debug("Discarding saved open chapter"); 712 } 713 714 result = uds_close_open_chapter(writer->chapters, index->zone_count, 715 index->volume, 716 writer->open_chapter_index, 717 writer->collated_records, 718 index->newest_virtual_chapter); 719 720 mutex_lock(&writer->mutex); 721 index->newest_virtual_chapter++; 722 index->oldest_virtual_chapter += 723 uds_chapters_to_expire(index->volume->geometry, 724 index->newest_virtual_chapter); 725 writer->result = result; 726 writer->zones_to_write = 0; 727 uds_broadcast_cond(&writer->cond); 728 } 729 } 730 731 static void stop_chapter_writer(struct chapter_writer *writer) 732 { 733 struct thread *writer_thread = NULL; 734 735 mutex_lock(&writer->mutex); 736 if (writer->thread != NULL) { 737 writer_thread = writer->thread; 738 writer->thread = NULL; 739 writer->stop = true; 740 uds_broadcast_cond(&writer->cond); 741 } 742 mutex_unlock(&writer->mutex); 743 744 if (writer_thread != NULL) 745 vdo_join_threads(writer_thread); 746 } 747 748 static void free_chapter_writer(struct chapter_writer *writer) 749 { 750 if (writer == NULL) 751 return; 752 753 stop_chapter_writer(writer); 754 uds_free_open_chapter_index(writer->open_chapter_index); 755 vdo_free(writer->collated_records); 756 vdo_free(writer); 757 } 758 759 static int make_chapter_writer(struct uds_index *index, 760 struct chapter_writer **writer_ptr) 761 { 762 int result; 763 struct chapter_writer *writer; 764 size_t collated_records_size = 765 (sizeof(struct uds_volume_record) * index->volume->geometry->records_per_chapter); 766 767 result = vdo_allocate_extended(struct chapter_writer, index->zone_count, 768 struct open_chapter_zone *, "Chapter Writer", 769 &writer); 770 if (result != VDO_SUCCESS) 771 return result; 772 773 writer->index = index; 774 mutex_init(&writer->mutex); 775 uds_init_cond(&writer->cond); 776 777 result = vdo_allocate_cache_aligned(collated_records_size, "collated records", 778 &writer->collated_records); 779 if (result != VDO_SUCCESS) { 780 free_chapter_writer(writer); 781 return result; 782 } 783 784 result = uds_make_open_chapter_index(&writer->open_chapter_index, 785 index->volume->geometry, 786 index->volume->nonce); 787 if (result != UDS_SUCCESS) { 788 free_chapter_writer(writer); 789 return result; 790 } 791 792 writer->memory_size = (sizeof(struct chapter_writer) + 793 index->zone_count * sizeof(struct open_chapter_zone *) + 794 collated_records_size + 795 writer->open_chapter_index->memory_size); 796 797 result = vdo_create_thread(close_chapters, writer, "writer", &writer->thread); 798 if (result != VDO_SUCCESS) { 799 free_chapter_writer(writer); 800 return result; 801 } 802 803 *writer_ptr = writer; 804 return UDS_SUCCESS; 805 } 806 807 static int load_index(struct uds_index *index) 808 { 809 int result; 810 u64 last_save_chapter; 811 812 result = uds_load_index_state(index->layout, index); 813 if (result != UDS_SUCCESS) 814 return UDS_INDEX_NOT_SAVED_CLEANLY; 815 816 last_save_chapter = ((index->last_save != NO_LAST_SAVE) ? index->last_save : 0); 817 818 vdo_log_info("loaded index from chapter %llu through chapter %llu", 819 (unsigned long long) index->oldest_virtual_chapter, 820 (unsigned long long) last_save_chapter); 821 822 return UDS_SUCCESS; 823 } 824 825 static int rebuild_index_page_map(struct uds_index *index, u64 vcn) 826 { 827 int result; 828 struct delta_index_page *chapter_index_page; 829 struct index_geometry *geometry = index->volume->geometry; 830 u32 chapter = uds_map_to_physical_chapter(geometry, vcn); 831 u32 expected_list_number = 0; 832 u32 index_page_number; 833 u32 lowest_delta_list; 834 u32 highest_delta_list; 835 836 for (index_page_number = 0; 837 index_page_number < geometry->index_pages_per_chapter; 838 index_page_number++) { 839 result = uds_get_volume_index_page(index->volume, chapter, 840 index_page_number, 841 &chapter_index_page); 842 if (result != UDS_SUCCESS) { 843 return vdo_log_error_strerror(result, 844 "failed to read index page %u in chapter %u", 845 index_page_number, chapter); 846 } 847 848 lowest_delta_list = chapter_index_page->lowest_list_number; 849 highest_delta_list = chapter_index_page->highest_list_number; 850 if (lowest_delta_list != expected_list_number) { 851 return vdo_log_error_strerror(UDS_CORRUPT_DATA, 852 "chapter %u index page %u is corrupt", 853 chapter, index_page_number); 854 } 855 856 uds_update_index_page_map(index->volume->index_page_map, vcn, chapter, 857 index_page_number, highest_delta_list); 858 expected_list_number = highest_delta_list + 1; 859 } 860 861 return UDS_SUCCESS; 862 } 863 864 static int replay_record(struct uds_index *index, const struct uds_record_name *name, 865 u64 virtual_chapter, bool will_be_sparse_chapter) 866 { 867 int result; 868 struct volume_index_record record; 869 bool update_record; 870 871 if (will_be_sparse_chapter && 872 !uds_is_volume_index_sample(index->volume_index, name)) { 873 /* 874 * This entry will be in a sparse chapter after the rebuild completes, and it is 875 * not a sample, so just skip over it. 876 */ 877 return UDS_SUCCESS; 878 } 879 880 result = uds_get_volume_index_record(index->volume_index, name, &record); 881 if (result != UDS_SUCCESS) 882 return result; 883 884 if (record.is_found) { 885 if (record.is_collision) { 886 if (record.virtual_chapter == virtual_chapter) { 887 /* The record is already correct. */ 888 return UDS_SUCCESS; 889 } 890 891 update_record = true; 892 } else if (record.virtual_chapter == virtual_chapter) { 893 /* 894 * There is a volume index entry pointing to the current chapter, but we 895 * don't know if it is for the same name as the one we are currently 896 * working on or not. For now, we're just going to assume that it isn't. 897 * This will create one extra collision record if there was a deleted 898 * record in the current chapter. 899 */ 900 update_record = false; 901 } else { 902 /* 903 * If we're rebuilding, we don't normally want to go to disk to see if the 904 * record exists, since we will likely have just read the record from disk 905 * (i.e. we know it's there). The exception to this is when we find an 906 * entry in the volume index that has a different chapter. In this case, we 907 * need to search that chapter to determine if the volume index entry was 908 * for the same record or a different one. 909 */ 910 result = uds_search_volume_page_cache_for_rebuild(index->volume, 911 name, 912 record.virtual_chapter, 913 &update_record); 914 if (result != UDS_SUCCESS) 915 return result; 916 } 917 } else { 918 update_record = false; 919 } 920 921 if (update_record) { 922 /* 923 * Update the volume index to reference the new chapter for the block. If the 924 * record had been deleted or dropped from the chapter index, it will be back. 925 */ 926 result = uds_set_volume_index_record_chapter(&record, virtual_chapter); 927 } else { 928 /* 929 * Add a new entry to the volume index referencing the open chapter. This should be 930 * done regardless of whether we are a brand new record or a sparse record, i.e. 931 * one that doesn't exist in the index but does on disk, since for a sparse record, 932 * we would want to un-sparsify if it did exist. 933 */ 934 result = uds_put_volume_index_record(&record, virtual_chapter); 935 } 936 937 if ((result == UDS_DUPLICATE_NAME) || (result == UDS_OVERFLOW)) { 938 /* The rebuilt index will lose these records. */ 939 return UDS_SUCCESS; 940 } 941 942 return result; 943 } 944 945 static bool check_for_suspend(struct uds_index *index) 946 { 947 bool closing; 948 949 if (index->load_context == NULL) 950 return false; 951 952 mutex_lock(&index->load_context->mutex); 953 if (index->load_context->status != INDEX_SUSPENDING) { 954 mutex_unlock(&index->load_context->mutex); 955 return false; 956 } 957 958 /* Notify that we are suspended and wait for the resume. */ 959 index->load_context->status = INDEX_SUSPENDED; 960 uds_broadcast_cond(&index->load_context->cond); 961 962 while ((index->load_context->status != INDEX_OPENING) && 963 (index->load_context->status != INDEX_FREEING)) 964 uds_wait_cond(&index->load_context->cond, &index->load_context->mutex); 965 966 closing = (index->load_context->status == INDEX_FREEING); 967 mutex_unlock(&index->load_context->mutex); 968 return closing; 969 } 970 971 static int replay_chapter(struct uds_index *index, u64 virtual, bool sparse) 972 { 973 int result; 974 u32 i; 975 u32 j; 976 const struct index_geometry *geometry; 977 u32 physical_chapter; 978 979 if (check_for_suspend(index)) { 980 vdo_log_info("Replay interrupted by index shutdown at chapter %llu", 981 (unsigned long long) virtual); 982 return -EBUSY; 983 } 984 985 geometry = index->volume->geometry; 986 physical_chapter = uds_map_to_physical_chapter(geometry, virtual); 987 uds_prefetch_volume_chapter(index->volume, physical_chapter); 988 uds_set_volume_index_open_chapter(index->volume_index, virtual); 989 990 result = rebuild_index_page_map(index, virtual); 991 if (result != UDS_SUCCESS) { 992 return vdo_log_error_strerror(result, 993 "could not rebuild index page map for chapter %u", 994 physical_chapter); 995 } 996 997 for (i = 0; i < geometry->record_pages_per_chapter; i++) { 998 u8 *record_page; 999 u32 record_page_number; 1000 1001 record_page_number = geometry->index_pages_per_chapter + i; 1002 result = uds_get_volume_record_page(index->volume, physical_chapter, 1003 record_page_number, &record_page); 1004 if (result != UDS_SUCCESS) { 1005 return vdo_log_error_strerror(result, "could not get page %d", 1006 record_page_number); 1007 } 1008 1009 for (j = 0; j < geometry->records_per_page; j++) { 1010 const u8 *name_bytes; 1011 struct uds_record_name name; 1012 1013 name_bytes = record_page + (j * BYTES_PER_RECORD); 1014 memcpy(&name.name, name_bytes, UDS_RECORD_NAME_SIZE); 1015 result = replay_record(index, &name, virtual, sparse); 1016 if (result != UDS_SUCCESS) 1017 return result; 1018 } 1019 } 1020 1021 return UDS_SUCCESS; 1022 } 1023 1024 static int replay_volume(struct uds_index *index) 1025 { 1026 int result; 1027 u64 old_map_update; 1028 u64 new_map_update; 1029 u64 virtual; 1030 u64 from_virtual = index->oldest_virtual_chapter; 1031 u64 upto_virtual = index->newest_virtual_chapter; 1032 bool will_be_sparse; 1033 1034 vdo_log_info("Replaying volume from chapter %llu through chapter %llu", 1035 (unsigned long long) from_virtual, 1036 (unsigned long long) upto_virtual); 1037 1038 /* 1039 * The index failed to load, so the volume index is empty. Add records to the volume index 1040 * in order, skipping non-hooks in chapters which will be sparse to save time. 1041 * 1042 * Go through each record page of each chapter and add the records back to the volume 1043 * index. This should not cause anything to be written to either the open chapter or the 1044 * on-disk volume. Also skip the on-disk chapter corresponding to upto_virtual, as this 1045 * would have already been purged from the volume index when the chapter was opened. 1046 * 1047 * Also, go through each index page for each chapter and rebuild the index page map. 1048 */ 1049 old_map_update = index->volume->index_page_map->last_update; 1050 for (virtual = from_virtual; virtual < upto_virtual; virtual++) { 1051 will_be_sparse = uds_is_chapter_sparse(index->volume->geometry, 1052 from_virtual, upto_virtual, 1053 virtual); 1054 result = replay_chapter(index, virtual, will_be_sparse); 1055 if (result != UDS_SUCCESS) 1056 return result; 1057 } 1058 1059 /* Also reap the chapter being replaced by the open chapter. */ 1060 uds_set_volume_index_open_chapter(index->volume_index, upto_virtual); 1061 1062 new_map_update = index->volume->index_page_map->last_update; 1063 if (new_map_update != old_map_update) { 1064 vdo_log_info("replay changed index page map update from %llu to %llu", 1065 (unsigned long long) old_map_update, 1066 (unsigned long long) new_map_update); 1067 } 1068 1069 return UDS_SUCCESS; 1070 } 1071 1072 static int rebuild_index(struct uds_index *index) 1073 { 1074 int result; 1075 u64 lowest; 1076 u64 highest; 1077 bool is_empty = false; 1078 u32 chapters_per_volume = index->volume->geometry->chapters_per_volume; 1079 1080 index->volume->lookup_mode = LOOKUP_FOR_REBUILD; 1081 result = uds_find_volume_chapter_boundaries(index->volume, &lowest, &highest, 1082 &is_empty); 1083 if (result != UDS_SUCCESS) { 1084 return vdo_log_fatal_strerror(result, 1085 "cannot rebuild index: unknown volume chapter boundaries"); 1086 } 1087 1088 if (is_empty) { 1089 index->newest_virtual_chapter = 0; 1090 index->oldest_virtual_chapter = 0; 1091 index->volume->lookup_mode = LOOKUP_NORMAL; 1092 return UDS_SUCCESS; 1093 } 1094 1095 index->newest_virtual_chapter = highest + 1; 1096 index->oldest_virtual_chapter = lowest; 1097 if (index->newest_virtual_chapter == 1098 (index->oldest_virtual_chapter + chapters_per_volume)) { 1099 /* Skip the chapter shadowed by the open chapter. */ 1100 index->oldest_virtual_chapter++; 1101 } 1102 1103 result = replay_volume(index); 1104 if (result != UDS_SUCCESS) 1105 return result; 1106 1107 index->volume->lookup_mode = LOOKUP_NORMAL; 1108 return UDS_SUCCESS; 1109 } 1110 1111 static void free_index_zone(struct index_zone *zone) 1112 { 1113 if (zone == NULL) 1114 return; 1115 1116 uds_free_open_chapter(zone->open_chapter); 1117 uds_free_open_chapter(zone->writing_chapter); 1118 vdo_free(zone); 1119 } 1120 1121 static int make_index_zone(struct uds_index *index, unsigned int zone_number) 1122 { 1123 int result; 1124 struct index_zone *zone; 1125 1126 result = vdo_allocate(1, struct index_zone, "index zone", &zone); 1127 if (result != VDO_SUCCESS) 1128 return result; 1129 1130 result = uds_make_open_chapter(index->volume->geometry, index->zone_count, 1131 &zone->open_chapter); 1132 if (result != UDS_SUCCESS) { 1133 free_index_zone(zone); 1134 return result; 1135 } 1136 1137 result = uds_make_open_chapter(index->volume->geometry, index->zone_count, 1138 &zone->writing_chapter); 1139 if (result != UDS_SUCCESS) { 1140 free_index_zone(zone); 1141 return result; 1142 } 1143 1144 zone->index = index; 1145 zone->id = zone_number; 1146 index->zones[zone_number] = zone; 1147 1148 return UDS_SUCCESS; 1149 } 1150 1151 int uds_make_index(struct uds_configuration *config, enum uds_open_index_type open_type, 1152 struct index_load_context *load_context, index_callback_fn callback, 1153 struct uds_index **new_index) 1154 { 1155 int result; 1156 bool loaded = false; 1157 bool new = (open_type == UDS_CREATE); 1158 struct uds_index *index = NULL; 1159 struct index_zone *zone; 1160 u64 nonce; 1161 unsigned int z; 1162 1163 result = vdo_allocate_extended(struct uds_index, config->zone_count, 1164 struct uds_request_queue *, "index", &index); 1165 if (result != VDO_SUCCESS) 1166 return result; 1167 1168 index->zone_count = config->zone_count; 1169 1170 result = uds_make_index_layout(config, new, &index->layout); 1171 if (result != UDS_SUCCESS) { 1172 uds_free_index(index); 1173 return result; 1174 } 1175 1176 result = vdo_allocate(index->zone_count, struct index_zone *, "zones", 1177 &index->zones); 1178 if (result != VDO_SUCCESS) { 1179 uds_free_index(index); 1180 return result; 1181 } 1182 1183 result = uds_make_volume(config, index->layout, &index->volume); 1184 if (result != UDS_SUCCESS) { 1185 uds_free_index(index); 1186 return result; 1187 } 1188 1189 index->volume->lookup_mode = LOOKUP_NORMAL; 1190 for (z = 0; z < index->zone_count; z++) { 1191 result = make_index_zone(index, z); 1192 if (result != UDS_SUCCESS) { 1193 uds_free_index(index); 1194 return vdo_log_error_strerror(result, 1195 "Could not create index zone"); 1196 } 1197 } 1198 1199 nonce = uds_get_volume_nonce(index->layout); 1200 result = uds_make_volume_index(config, nonce, &index->volume_index); 1201 if (result != UDS_SUCCESS) { 1202 uds_free_index(index); 1203 return vdo_log_error_strerror(result, "could not make volume index"); 1204 } 1205 1206 index->load_context = load_context; 1207 index->callback = callback; 1208 1209 result = initialize_index_queues(index, config->geometry); 1210 if (result != UDS_SUCCESS) { 1211 uds_free_index(index); 1212 return result; 1213 } 1214 1215 result = make_chapter_writer(index, &index->chapter_writer); 1216 if (result != UDS_SUCCESS) { 1217 uds_free_index(index); 1218 return result; 1219 } 1220 1221 if (!new) { 1222 result = load_index(index); 1223 switch (result) { 1224 case UDS_SUCCESS: 1225 loaded = true; 1226 break; 1227 case -ENOMEM: 1228 /* We should not try a rebuild for this error. */ 1229 vdo_log_error_strerror(result, "index could not be loaded"); 1230 break; 1231 default: 1232 vdo_log_error_strerror(result, "index could not be loaded"); 1233 if (open_type == UDS_LOAD) { 1234 result = rebuild_index(index); 1235 if (result != UDS_SUCCESS) { 1236 vdo_log_error_strerror(result, 1237 "index could not be rebuilt"); 1238 } 1239 } 1240 break; 1241 } 1242 } 1243 1244 if (result != UDS_SUCCESS) { 1245 uds_free_index(index); 1246 return vdo_log_error_strerror(result, "fatal error in %s()", __func__); 1247 } 1248 1249 for (z = 0; z < index->zone_count; z++) { 1250 zone = index->zones[z]; 1251 zone->oldest_virtual_chapter = index->oldest_virtual_chapter; 1252 zone->newest_virtual_chapter = index->newest_virtual_chapter; 1253 } 1254 1255 if (index->load_context != NULL) { 1256 mutex_lock(&index->load_context->mutex); 1257 index->load_context->status = INDEX_READY; 1258 /* 1259 * If we get here, suspend is meaningless, but notify any thread trying to suspend 1260 * us so it doesn't hang. 1261 */ 1262 uds_broadcast_cond(&index->load_context->cond); 1263 mutex_unlock(&index->load_context->mutex); 1264 } 1265 1266 index->has_saved_open_chapter = loaded; 1267 index->need_to_save = !loaded; 1268 *new_index = index; 1269 return UDS_SUCCESS; 1270 } 1271 1272 void uds_free_index(struct uds_index *index) 1273 { 1274 unsigned int i; 1275 1276 if (index == NULL) 1277 return; 1278 1279 uds_request_queue_finish(index->triage_queue); 1280 for (i = 0; i < index->zone_count; i++) 1281 uds_request_queue_finish(index->zone_queues[i]); 1282 1283 free_chapter_writer(index->chapter_writer); 1284 1285 uds_free_volume_index(index->volume_index); 1286 if (index->zones != NULL) { 1287 for (i = 0; i < index->zone_count; i++) 1288 free_index_zone(index->zones[i]); 1289 vdo_free(index->zones); 1290 } 1291 1292 uds_free_volume(index->volume); 1293 uds_free_index_layout(vdo_forget(index->layout)); 1294 vdo_free(index); 1295 } 1296 1297 /* Wait for the chapter writer to complete any outstanding writes. */ 1298 void uds_wait_for_idle_index(struct uds_index *index) 1299 { 1300 struct chapter_writer *writer = index->chapter_writer; 1301 1302 mutex_lock(&writer->mutex); 1303 while (writer->zones_to_write > 0) 1304 uds_wait_cond(&writer->cond, &writer->mutex); 1305 mutex_unlock(&writer->mutex); 1306 } 1307 1308 /* This function assumes that all requests have been drained. */ 1309 int uds_save_index(struct uds_index *index) 1310 { 1311 int result; 1312 1313 if (!index->need_to_save) 1314 return UDS_SUCCESS; 1315 1316 uds_wait_for_idle_index(index); 1317 index->prev_save = index->last_save; 1318 index->last_save = ((index->newest_virtual_chapter == 0) ? 1319 NO_LAST_SAVE : index->newest_virtual_chapter - 1); 1320 vdo_log_info("beginning save (vcn %llu)", (unsigned long long) index->last_save); 1321 1322 result = uds_save_index_state(index->layout, index); 1323 if (result != UDS_SUCCESS) { 1324 vdo_log_info("save index failed"); 1325 index->last_save = index->prev_save; 1326 } else { 1327 index->has_saved_open_chapter = true; 1328 index->need_to_save = false; 1329 vdo_log_info("finished save (vcn %llu)", 1330 (unsigned long long) index->last_save); 1331 } 1332 1333 return result; 1334 } 1335 1336 int uds_replace_index_storage(struct uds_index *index, struct block_device *bdev) 1337 { 1338 return uds_replace_volume_storage(index->volume, index->layout, bdev); 1339 } 1340 1341 /* Accessing statistics should be safe from any thread. */ 1342 void uds_get_index_stats(struct uds_index *index, struct uds_index_stats *counters) 1343 { 1344 struct volume_index_stats stats; 1345 1346 uds_get_volume_index_stats(index->volume_index, &stats); 1347 counters->entries_indexed = stats.record_count; 1348 counters->collisions = stats.collision_count; 1349 counters->entries_discarded = stats.discard_count; 1350 1351 counters->memory_used = (index->volume_index->memory_size + 1352 index->volume->cache_size + 1353 index->chapter_writer->memory_size); 1354 } 1355 1356 void uds_enqueue_request(struct uds_request *request, enum request_stage stage) 1357 { 1358 struct uds_index *index = request->index; 1359 struct uds_request_queue *queue; 1360 1361 switch (stage) { 1362 case STAGE_TRIAGE: 1363 if (index->triage_queue != NULL) { 1364 queue = index->triage_queue; 1365 break; 1366 } 1367 1368 fallthrough; 1369 1370 case STAGE_INDEX: 1371 request->zone_number = 1372 uds_get_volume_index_zone(index->volume_index, &request->record_name); 1373 fallthrough; 1374 1375 case STAGE_MESSAGE: 1376 queue = index->zone_queues[request->zone_number]; 1377 break; 1378 1379 default: 1380 VDO_ASSERT_LOG_ONLY(false, "invalid index stage: %d", stage); 1381 return; 1382 } 1383 1384 uds_request_queue_enqueue(queue, request); 1385 } 1386