1 // SPDX-License-Identifier: 0BSD 2 3 /////////////////////////////////////////////////////////////////////////////// 4 // 5 /// \file stream_decoder_mt.c 6 /// \brief Multithreaded .xz Stream decoder 7 // 8 // Authors: Sebastian Andrzej Siewior 9 // Lasse Collin 10 // 11 /////////////////////////////////////////////////////////////////////////////// 12 13 #include "common.h" 14 #include "block_decoder.h" 15 #include "stream_decoder.h" 16 #include "index.h" 17 #include "outqueue.h" 18 19 20 typedef enum { 21 /// Waiting for work. 22 /// Main thread may change this to THR_RUN or THR_EXIT. 23 THR_IDLE, 24 25 /// Decoding is in progress. 26 /// Main thread may change this to THR_IDLE or THR_EXIT. 27 /// The worker thread may change this to THR_IDLE. 28 THR_RUN, 29 30 /// The main thread wants the thread to exit. 31 THR_EXIT, 32 33 } worker_state; 34 35 36 struct worker_thread { 37 /// Worker state is protected with our mutex. 38 worker_state state; 39 40 /// Input buffer that will contain the whole Block except Block Header. 41 uint8_t *in; 42 43 /// Amount of memory allocated for "in" 44 size_t in_size; 45 46 /// Number of bytes written to "in" by the main thread 47 size_t in_filled; 48 49 /// Number of bytes consumed from "in" by the worker thread. 50 size_t in_pos; 51 52 /// Amount of uncompressed data that has been decoded. This local 53 /// copy is needed because updating outbuf->pos requires locking 54 /// the main mutex (coder->mutex). 55 size_t out_pos; 56 57 /// Pointer to the main structure is needed to (1) lock the main 58 /// mutex (coder->mutex) when updating outbuf->pos and (2) when 59 /// putting this thread back to the stack of free threads. 60 struct lzma_stream_coder *coder; 61 62 /// The allocator is set by the main thread. Since a copy of the 63 /// pointer is kept here, the application must not change the 64 /// allocator before calling lzma_end(). 65 const lzma_allocator *allocator; 66 67 /// Output queue buffer to which the uncompressed data is written. 68 lzma_outbuf *outbuf; 69 70 /// Amount of compressed data that has already been decompressed. 71 /// This is updated from in_pos when our mutex is locked. 72 /// This is size_t, not uint64_t, because per-thread progress 73 /// is limited to sizes of allocated buffers. 74 size_t progress_in; 75 76 /// Like progress_in but for uncompressed data. 77 size_t progress_out; 78 79 /// Updating outbuf->pos requires locking the main mutex 80 /// (coder->mutex). Since the main thread will only read output 81 /// from the oldest outbuf in the queue, only the worker thread 82 /// that is associated with the oldest outbuf needs to update its 83 /// outbuf->pos. This avoids useless mutex contention that would 84 /// happen if all worker threads were frequently locking the main 85 /// mutex to update their outbuf->pos. 86 /// 87 /// When partial_update_enabled is true, this worker thread will 88 /// update outbuf->pos and outbuf->decoder_in_pos after each call 89 /// to the Block decoder. This is initially false. Main thread may 90 /// set this to true. 91 bool partial_update_enabled; 92 93 /// Once the main thread has set partial_updated_enabled = true, 94 /// we will do the first partial update as soon as we can and 95 /// set partial_update_started = true. After the first update, 96 /// we only update if we have made progress. This avoids useless 97 /// locking of thr->coder->mutex. 98 bool partial_update_started; 99 100 /// Block decoder 101 lzma_next_coder block_decoder; 102 103 /// Thread-specific Block options are needed because the Block 104 /// decoder modifies the struct given to it at initialization. 105 lzma_block block_options; 106 107 /// Filter chain memory usage 108 uint64_t mem_filters; 109 110 /// Next structure in the stack of free worker threads. 111 struct worker_thread *next; 112 113 mythread_mutex mutex; 114 mythread_cond cond; 115 116 /// The ID of this thread is used to join the thread 117 /// when it's not needed anymore. 118 mythread thread_id; 119 }; 120 121 122 struct lzma_stream_coder { 123 enum { 124 SEQ_STREAM_HEADER, 125 SEQ_BLOCK_HEADER, 126 SEQ_BLOCK_INIT, 127 SEQ_BLOCK_THR_INIT, 128 SEQ_BLOCK_THR_RUN, 129 SEQ_BLOCK_DIRECT_INIT, 130 SEQ_BLOCK_DIRECT_RUN, 131 SEQ_INDEX_WAIT_OUTPUT, 132 SEQ_INDEX_DECODE, 133 SEQ_STREAM_FOOTER, 134 SEQ_STREAM_PADDING, 135 SEQ_ERROR, 136 } sequence; 137 138 /// Block decoder 139 lzma_next_coder block_decoder; 140 141 /// Every Block Header will be decoded into this structure. 142 /// This is also used to initialize a Block decoder when in 143 /// direct mode. In threaded mode, a thread-specific copy will 144 /// be made for decoder initialization because the Block decoder 145 /// will modify the structure given to it. 146 lzma_block block_options; 147 148 /// Buffer to hold a filter chain for Block Header decoding and 149 /// initialization. These are freed after successful Block decoder 150 /// initialization or at stream_decoder_mt_end(). The thread-specific 151 /// copy of block_options won't hold a pointer to filters[] after 152 /// initialization. 153 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 154 155 /// Stream Flags from Stream Header 156 lzma_stream_flags stream_flags; 157 158 /// Index is hashed so that it can be compared to the sizes of Blocks 159 /// with O(1) memory usage. 160 lzma_index_hash *index_hash; 161 162 163 /// Maximum wait time if cannot use all the input and cannot 164 /// fill the output buffer. This is in milliseconds. 165 uint32_t timeout; 166 167 168 /// Error code from a worker thread. 169 /// 170 /// \note Use mutex. 171 lzma_ret thread_error; 172 173 /// Error code to return after pending output has been copied out. If 174 /// set in read_output_and_wait(), this is a mirror of thread_error. 175 /// If set in stream_decode_mt() then it's, for example, error that 176 /// occurred when decoding Block Header. 177 lzma_ret pending_error; 178 179 /// Number of threads that will be created at maximum. 180 uint32_t threads_max; 181 182 /// Number of thread structures that have been initialized from 183 /// "threads", and thus the number of worker threads actually 184 /// created so far. 185 uint32_t threads_initialized; 186 187 /// Array of allocated thread-specific structures. When no threads 188 /// are in use (direct mode) this is NULL. In threaded mode this 189 /// points to an array of threads_max number of worker_thread structs. 190 struct worker_thread *threads; 191 192 /// Stack of free threads. When a thread finishes, it puts itself 193 /// back into this stack. This starts as empty because threads 194 /// are created only when actually needed. 195 /// 196 /// \note Use mutex. 197 struct worker_thread *threads_free; 198 199 /// The most recent worker thread to which the main thread writes 200 /// the new input from the application. 201 struct worker_thread *thr; 202 203 /// Output buffer queue for decompressed data from the worker threads 204 /// 205 /// \note Use mutex with operations that need it. 206 lzma_outq outq; 207 208 mythread_mutex mutex; 209 mythread_cond cond; 210 211 212 /// Memory usage that will not be exceeded in multi-threaded mode. 213 /// Single-threaded mode can exceed this even by a large amount. 214 uint64_t memlimit_threading; 215 216 /// Memory usage limit that should never be exceeded. 217 /// LZMA_MEMLIMIT_ERROR will be returned if decoding isn't possible 218 /// even in single-threaded mode without exceeding this limit. 219 uint64_t memlimit_stop; 220 221 /// Amount of memory in use by the direct mode decoder 222 /// (coder->block_decoder). In threaded mode this is 0. 223 uint64_t mem_direct_mode; 224 225 /// Amount of memory needed by the running worker threads. 226 /// This doesn't include the memory needed by the output buffer. 227 /// 228 /// \note Use mutex. 229 uint64_t mem_in_use; 230 231 /// Amount of memory used by the idle (cached) threads. 232 /// 233 /// \note Use mutex. 234 uint64_t mem_cached; 235 236 237 /// Amount of memory needed for the filter chain of the next Block. 238 uint64_t mem_next_filters; 239 240 /// Amount of memory needed for the thread-specific input buffer 241 /// for the next Block. 242 uint64_t mem_next_in; 243 244 /// Amount of memory actually needed to decode the next Block 245 /// in threaded mode. This is 246 /// mem_next_filters + mem_next_in + memory needed for lzma_outbuf. 247 uint64_t mem_next_block; 248 249 250 /// Amount of compressed data in Stream Header + Blocks that have 251 /// already been finished. 252 /// 253 /// \note Use mutex. 254 uint64_t progress_in; 255 256 /// Amount of uncompressed data in Blocks that have already 257 /// been finished. 258 /// 259 /// \note Use mutex. 260 uint64_t progress_out; 261 262 263 /// If true, LZMA_NO_CHECK is returned if the Stream has 264 /// no integrity check. 265 bool tell_no_check; 266 267 /// If true, LZMA_UNSUPPORTED_CHECK is returned if the Stream has 268 /// an integrity check that isn't supported by this liblzma build. 269 bool tell_unsupported_check; 270 271 /// If true, LZMA_GET_CHECK is returned after decoding Stream Header. 272 bool tell_any_check; 273 274 /// If true, we will tell the Block decoder to skip calculating 275 /// and verifying the integrity check. 276 bool ignore_check; 277 278 /// If true, we will decode concatenated Streams that possibly have 279 /// Stream Padding between or after them. LZMA_STREAM_END is returned 280 /// once the application isn't giving us any new input (LZMA_FINISH), 281 /// and we aren't in the middle of a Stream, and possible 282 /// Stream Padding is a multiple of four bytes. 283 bool concatenated; 284 285 /// If true, we will return any errors immediately instead of first 286 /// producing all output before the location of the error. 287 bool fail_fast; 288 289 290 /// When decoding concatenated Streams, this is true as long as we 291 /// are decoding the first Stream. This is needed to avoid misleading 292 /// LZMA_FORMAT_ERROR in case the later Streams don't have valid magic 293 /// bytes. 294 bool first_stream; 295 296 /// This is used to track if the previous call to stream_decode_mt() 297 /// had output space (*out_pos < out_size) and managed to fill the 298 /// output buffer (*out_pos == out_size). This may be set to true 299 /// in read_output_and_wait(). This is read and then reset to false 300 /// at the beginning of stream_decode_mt(). 301 /// 302 /// This is needed to support applications that call lzma_code() in 303 /// such a way that more input is provided only when lzma_code() 304 /// didn't fill the output buffer completely. Basically, this makes 305 /// it easier to convert such applications from single-threaded 306 /// decoder to multi-threaded decoder. 307 bool out_was_filled; 308 309 /// Write position in buffer[] and position in Stream Padding 310 size_t pos; 311 312 /// Buffer to hold Stream Header, Block Header, and Stream Footer. 313 /// Block Header has biggest maximum size. 314 uint8_t buffer[LZMA_BLOCK_HEADER_SIZE_MAX]; 315 }; 316 317 318 /// Enables updating of outbuf->pos. This is a callback function that is 319 /// used with lzma_outq_enable_partial_output(). 320 static void 321 worker_enable_partial_update(void *thr_ptr) 322 { 323 struct worker_thread *thr = thr_ptr; 324 325 mythread_sync(thr->mutex) { 326 thr->partial_update_enabled = true; 327 mythread_cond_signal(&thr->cond); 328 } 329 } 330 331 332 static MYTHREAD_RET_TYPE 333 worker_decoder(void *thr_ptr) 334 { 335 struct worker_thread *thr = thr_ptr; 336 size_t in_filled; 337 bool partial_update_enabled; 338 lzma_ret ret; 339 340 next_loop_lock: 341 342 mythread_mutex_lock(&thr->mutex); 343 next_loop_unlocked: 344 345 if (thr->state == THR_IDLE) { 346 mythread_cond_wait(&thr->cond, &thr->mutex); 347 goto next_loop_unlocked; 348 } 349 350 if (thr->state == THR_EXIT) { 351 mythread_mutex_unlock(&thr->mutex); 352 353 lzma_free(thr->in, thr->allocator); 354 lzma_next_end(&thr->block_decoder, thr->allocator); 355 356 mythread_mutex_destroy(&thr->mutex); 357 mythread_cond_destroy(&thr->cond); 358 359 return MYTHREAD_RET_VALUE; 360 } 361 362 assert(thr->state == THR_RUN); 363 364 // Update progress info for get_progress(). 365 thr->progress_in = thr->in_pos; 366 thr->progress_out = thr->out_pos; 367 368 // If we don't have any new input, wait for a signal from the main 369 // thread except if partial output has just been enabled 370 // (partial_update_enabled is true but _started is false). In that 371 // case we will do one normal run so that the partial output info 372 // gets passed to the main thread. The call to block_decoder.code() 373 // is useless but harmless as it can occur only once per Block. 374 in_filled = thr->in_filled; 375 partial_update_enabled = thr->partial_update_enabled; 376 377 if (in_filled == thr->in_pos && !(partial_update_enabled 378 && !thr->partial_update_started)) { 379 mythread_cond_wait(&thr->cond, &thr->mutex); 380 goto next_loop_unlocked; 381 } 382 383 mythread_mutex_unlock(&thr->mutex); 384 385 // Pass the input in small chunks to the Block decoder. 386 // This way we react reasonably fast if we are told to stop/exit, 387 // and (when partial update is enabled) we tell about our progress 388 // to the main thread frequently enough. 389 const size_t chunk_size = 16384; 390 if ((in_filled - thr->in_pos) > chunk_size) 391 in_filled = thr->in_pos + chunk_size; 392 393 ret = thr->block_decoder.code( 394 thr->block_decoder.coder, thr->allocator, 395 thr->in, &thr->in_pos, in_filled, 396 thr->outbuf->buf, &thr->out_pos, 397 thr->outbuf->allocated, LZMA_RUN); 398 399 if (ret == LZMA_OK) { 400 if (partial_update_enabled) { 401 // Remember that we have done at least one partial 402 // update. After the first update we won't do updates 403 // unless we have made progress. 404 thr->partial_update_started = true; 405 406 // The main thread is reading decompressed data 407 // from thr->outbuf. Tell the main thread about 408 // our progress. 409 // 410 // NOTE: It's possible that we consumed input without 411 // producing any new output so it's possible that only 412 // in_pos has changed. If thr->partial_update_started 413 // was false, it is possible that neither in_pos nor 414 // out_pos has changed. 415 mythread_sync(thr->coder->mutex) { 416 thr->outbuf->pos = thr->out_pos; 417 thr->outbuf->decoder_in_pos = thr->in_pos; 418 mythread_cond_signal(&thr->coder->cond); 419 } 420 } 421 422 goto next_loop_lock; 423 } 424 425 // Either we finished successfully (LZMA_STREAM_END) or an error 426 // occurred. 427 // 428 // The sizes are in the Block Header and the Block decoder 429 // checks that they match, thus we know these: 430 assert(ret != LZMA_STREAM_END || thr->in_pos == thr->in_size); 431 assert(ret != LZMA_STREAM_END 432 || thr->out_pos == thr->block_options.uncompressed_size); 433 434 mythread_sync(thr->mutex) { 435 // Block decoder ensures this, but do a sanity check anyway 436 // because thr->in_filled < thr->in_size means that the main 437 // thread is still writing to thr->in. 438 if (ret == LZMA_STREAM_END && thr->in_filled != thr->in_size) { 439 assert(0); 440 ret = LZMA_PROG_ERROR; 441 } 442 443 if (thr->state != THR_EXIT) 444 thr->state = THR_IDLE; 445 } 446 447 // Free the input buffer. Don't update in_size as we need 448 // it later to update thr->coder->mem_in_use. 449 // 450 // This step is skipped if an error occurred because the main thread 451 // might still be writing to thr->in. The memory will be freed after 452 // threads_end() sets thr->state = THR_EXIT. 453 if (ret == LZMA_STREAM_END) { 454 lzma_free(thr->in, thr->allocator); 455 thr->in = NULL; 456 } 457 458 mythread_sync(thr->coder->mutex) { 459 // Move our progress info to the main thread. 460 thr->coder->progress_in += thr->in_pos; 461 thr->coder->progress_out += thr->out_pos; 462 thr->progress_in = 0; 463 thr->progress_out = 0; 464 465 // Mark the outbuf as finished. 466 thr->outbuf->pos = thr->out_pos; 467 thr->outbuf->decoder_in_pos = thr->in_pos; 468 thr->outbuf->finished = true; 469 thr->outbuf->finish_ret = ret; 470 thr->outbuf = NULL; 471 472 // If an error occurred, tell it to the main thread. 473 if (ret != LZMA_STREAM_END 474 && thr->coder->thread_error == LZMA_OK) 475 thr->coder->thread_error = ret; 476 477 // Return the worker thread to the stack of available 478 // threads only if no errors occurred. 479 if (ret == LZMA_STREAM_END) { 480 // Update memory usage counters. 481 thr->coder->mem_in_use -= thr->in_size; 482 thr->coder->mem_in_use -= thr->mem_filters; 483 thr->coder->mem_cached += thr->mem_filters; 484 485 // Put this thread to the stack of free threads. 486 thr->next = thr->coder->threads_free; 487 thr->coder->threads_free = thr; 488 } 489 490 mythread_cond_signal(&thr->coder->cond); 491 } 492 493 goto next_loop_lock; 494 } 495 496 497 /// Tells the worker threads to exit and waits for them to terminate. 498 static void 499 threads_end(struct lzma_stream_coder *coder, const lzma_allocator *allocator) 500 { 501 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 502 mythread_sync(coder->threads[i].mutex) { 503 coder->threads[i].state = THR_EXIT; 504 mythread_cond_signal(&coder->threads[i].cond); 505 } 506 } 507 508 for (uint32_t i = 0; i < coder->threads_initialized; ++i) 509 mythread_join(coder->threads[i].thread_id); 510 511 lzma_free(coder->threads, allocator); 512 coder->threads_initialized = 0; 513 coder->threads = NULL; 514 coder->threads_free = NULL; 515 516 // The threads don't update these when they exit. Do it here. 517 coder->mem_in_use = 0; 518 coder->mem_cached = 0; 519 520 return; 521 } 522 523 524 /// Tell worker threads to stop without doing any cleaning up. 525 /// The clean up will be done when threads_exit() is called; 526 /// it's not possible to reuse the threads after threads_stop(). 527 /// 528 /// This is called before returning an unrecoverable error code 529 /// to the application. It would be waste of processor time 530 /// to keep the threads running in such a situation. 531 static void 532 threads_stop(struct lzma_stream_coder *coder) 533 { 534 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 535 // The threads that are in the THR_RUN state will stop 536 // when they check the state the next time. There's no 537 // need to signal coder->threads[i].cond. 538 mythread_sync(coder->threads[i].mutex) { 539 coder->threads[i].state = THR_IDLE; 540 } 541 } 542 543 return; 544 } 545 546 547 /// Initialize a new worker_thread structure and create a new thread. 548 static lzma_ret 549 initialize_new_thread(struct lzma_stream_coder *coder, 550 const lzma_allocator *allocator) 551 { 552 // Allocate the coder->threads array if needed. It's done here instead 553 // of when initializing the decoder because we don't need this if we 554 // use the direct mode (we may even free coder->threads in the middle 555 // of the file if we switch from threaded to direct mode). 556 if (coder->threads == NULL) { 557 coder->threads = lzma_alloc( 558 coder->threads_max * sizeof(struct worker_thread), 559 allocator); 560 561 if (coder->threads == NULL) 562 return LZMA_MEM_ERROR; 563 } 564 565 // Pick a free structure. 566 assert(coder->threads_initialized < coder->threads_max); 567 struct worker_thread *thr 568 = &coder->threads[coder->threads_initialized]; 569 570 if (mythread_mutex_init(&thr->mutex)) 571 goto error_mutex; 572 573 if (mythread_cond_init(&thr->cond)) 574 goto error_cond; 575 576 thr->state = THR_IDLE; 577 thr->in = NULL; 578 thr->in_size = 0; 579 thr->allocator = allocator; 580 thr->coder = coder; 581 thr->outbuf = NULL; 582 thr->block_decoder = LZMA_NEXT_CODER_INIT; 583 thr->mem_filters = 0; 584 585 if (mythread_create(&thr->thread_id, worker_decoder, thr)) 586 goto error_thread; 587 588 ++coder->threads_initialized; 589 coder->thr = thr; 590 591 return LZMA_OK; 592 593 error_thread: 594 mythread_cond_destroy(&thr->cond); 595 596 error_cond: 597 mythread_mutex_destroy(&thr->mutex); 598 599 error_mutex: 600 return LZMA_MEM_ERROR; 601 } 602 603 604 static lzma_ret 605 get_thread(struct lzma_stream_coder *coder, const lzma_allocator *allocator) 606 { 607 // If there is a free structure on the stack, use it. 608 mythread_sync(coder->mutex) { 609 if (coder->threads_free != NULL) { 610 coder->thr = coder->threads_free; 611 coder->threads_free = coder->threads_free->next; 612 613 // The thread is no longer in the cache so subtract 614 // it from the cached memory usage. Don't add it 615 // to mem_in_use though; the caller will handle it 616 // since it knows how much memory it will actually 617 // use (the filter chain might change). 618 coder->mem_cached -= coder->thr->mem_filters; 619 } 620 } 621 622 if (coder->thr == NULL) { 623 assert(coder->threads_initialized < coder->threads_max); 624 625 // Initialize a new thread. 626 return_if_error(initialize_new_thread(coder, allocator)); 627 } 628 629 coder->thr->in_filled = 0; 630 coder->thr->in_pos = 0; 631 coder->thr->out_pos = 0; 632 633 coder->thr->progress_in = 0; 634 coder->thr->progress_out = 0; 635 636 coder->thr->partial_update_enabled = false; 637 coder->thr->partial_update_started = false; 638 639 return LZMA_OK; 640 } 641 642 643 static lzma_ret 644 read_output_and_wait(struct lzma_stream_coder *coder, 645 const lzma_allocator *allocator, 646 uint8_t *restrict out, size_t *restrict out_pos, 647 size_t out_size, 648 bool *input_is_possible, 649 bool waiting_allowed, 650 mythread_condtime *wait_abs, bool *has_blocked) 651 { 652 lzma_ret ret = LZMA_OK; 653 654 mythread_sync(coder->mutex) { 655 do { 656 // Get as much output from the queue as is possible 657 // without blocking. 658 const size_t out_start = *out_pos; 659 do { 660 ret = lzma_outq_read(&coder->outq, allocator, 661 out, out_pos, out_size, 662 NULL, NULL); 663 664 // If a Block was finished, tell the worker 665 // thread of the next Block (if it is still 666 // running) to start telling the main thread 667 // when new output is available. 668 if (ret == LZMA_STREAM_END) 669 lzma_outq_enable_partial_output( 670 &coder->outq, 671 &worker_enable_partial_update); 672 673 // Loop until a Block wasn't finished. 674 // It's important to loop around even if 675 // *out_pos == out_size because there could 676 // be an empty Block that will return 677 // LZMA_STREAM_END without needing any 678 // output space. 679 } while (ret == LZMA_STREAM_END); 680 681 // Check if lzma_outq_read reported an error from 682 // the Block decoder. 683 if (ret != LZMA_OK) 684 break; 685 686 // If the output buffer is now full but it wasn't full 687 // when this function was called, set out_was_filled. 688 // This way the next call to stream_decode_mt() knows 689 // that some output was produced and no output space 690 // remained in the previous call to stream_decode_mt(). 691 if (*out_pos == out_size && *out_pos != out_start) 692 coder->out_was_filled = true; 693 694 // Check if any thread has indicated an error. 695 if (coder->thread_error != LZMA_OK) { 696 // If LZMA_FAIL_FAST was used, report errors 697 // from worker threads immediately. 698 if (coder->fail_fast) { 699 ret = coder->thread_error; 700 break; 701 } 702 703 // Otherwise set pending_error. The value we 704 // set here will not actually get used other 705 // than working as a flag that an error has 706 // occurred. This is because in SEQ_ERROR 707 // all output before the error will be read 708 // first by calling this function, and once we 709 // reach the location of the (first) error the 710 // error code from the above lzma_outq_read() 711 // will be returned to the application. 712 // 713 // Use LZMA_PROG_ERROR since the value should 714 // never leak to the application. It's 715 // possible that pending_error has already 716 // been set but that doesn't matter: if we get 717 // here, pending_error only works as a flag. 718 coder->pending_error = LZMA_PROG_ERROR; 719 } 720 721 // Check if decoding of the next Block can be started. 722 // The memusage of the active threads must be low 723 // enough, there must be a free buffer slot in the 724 // output queue, and there must be a free thread 725 // (that can be either created or an existing one 726 // reused). 727 // 728 // NOTE: This is checked after reading the output 729 // above because reading the output can free a slot in 730 // the output queue and also reduce active memusage. 731 // 732 // NOTE: If output queue is empty, then input will 733 // always be possible. 734 if (input_is_possible != NULL 735 && coder->memlimit_threading 736 - coder->mem_in_use 737 - coder->outq.mem_in_use 738 >= coder->mem_next_block 739 && lzma_outq_has_buf(&coder->outq) 740 && (coder->threads_initialized 741 < coder->threads_max 742 || coder->threads_free 743 != NULL)) { 744 *input_is_possible = true; 745 break; 746 } 747 748 // If the caller doesn't want us to block, return now. 749 if (!waiting_allowed) 750 break; 751 752 // This check is needed only when input_is_possible 753 // is NULL. We must return if we aren't waiting for 754 // input to become possible and there is no more 755 // output coming from the queue. 756 if (lzma_outq_is_empty(&coder->outq)) { 757 assert(input_is_possible == NULL); 758 break; 759 } 760 761 // If there is more data available from the queue, 762 // our out buffer must be full and we need to return 763 // so that the application can provide more output 764 // space. 765 // 766 // NOTE: In general lzma_outq_is_readable() can return 767 // true also when there are no more bytes available. 768 // This can happen when a Block has finished without 769 // providing any new output. We know that this is not 770 // the case because in the beginning of this loop we 771 // tried to read as much as possible even when we had 772 // no output space left and the mutex has been locked 773 // all the time (so worker threads cannot have changed 774 // anything). Thus there must be actual pending output 775 // in the queue. 776 if (lzma_outq_is_readable(&coder->outq)) { 777 assert(*out_pos == out_size); 778 break; 779 } 780 781 // If the application stops providing more input 782 // in the middle of a Block, there will eventually 783 // be one worker thread left that is stuck waiting for 784 // more input (that might never arrive) and a matching 785 // outbuf which the worker thread cannot finish due 786 // to lack of input. We must detect this situation, 787 // otherwise we would end up waiting indefinitely 788 // (if no timeout is in use) or keep returning 789 // LZMA_TIMED_OUT while making no progress. Thus, the 790 // application would never get LZMA_BUF_ERROR from 791 // lzma_code() which would tell the application that 792 // no more progress is possible. No LZMA_BUF_ERROR 793 // means that, for example, truncated .xz files could 794 // cause an infinite loop. 795 // 796 // A worker thread doing partial updates will 797 // store not only the output position in outbuf->pos 798 // but also the matching input position in 799 // outbuf->decoder_in_pos. Here we check if that 800 // input position matches the amount of input that 801 // the worker thread has been given (in_filled). 802 // If so, we must return and not wait as no more 803 // output will be coming without first getting more 804 // input to the worker thread. If the application 805 // keeps calling lzma_code() without providing more 806 // input, it will eventually get LZMA_BUF_ERROR. 807 // 808 // NOTE: We can read partial_update_enabled and 809 // in_filled without thr->mutex as only the main thread 810 // modifies these variables. decoder_in_pos requires 811 // coder->mutex which we are already holding. 812 if (coder->thr != NULL && 813 coder->thr->partial_update_enabled) { 814 // There is exactly one outbuf in the queue. 815 assert(coder->thr->outbuf == coder->outq.head); 816 assert(coder->thr->outbuf == coder->outq.tail); 817 818 if (coder->thr->outbuf->decoder_in_pos 819 == coder->thr->in_filled) 820 break; 821 } 822 823 // Wait for input or output to become possible. 824 if (coder->timeout != 0) { 825 // See the comment in stream_encoder_mt.c 826 // about why mythread_condtime_set() is used 827 // like this. 828 // 829 // FIXME? 830 // In contrast to the encoder, this calls 831 // _condtime_set while the mutex is locked. 832 if (!*has_blocked) { 833 *has_blocked = true; 834 mythread_condtime_set(wait_abs, 835 &coder->cond, 836 coder->timeout); 837 } 838 839 if (mythread_cond_timedwait(&coder->cond, 840 &coder->mutex, 841 wait_abs) != 0) { 842 ret = LZMA_TIMED_OUT; 843 break; 844 } 845 } else { 846 mythread_cond_wait(&coder->cond, 847 &coder->mutex); 848 } 849 } while (ret == LZMA_OK); 850 } 851 852 // If we are returning an error, then the application cannot get 853 // more output from us and thus keeping the threads running is 854 // useless and waste of CPU time. 855 if (ret != LZMA_OK && ret != LZMA_TIMED_OUT) 856 threads_stop(coder); 857 858 return ret; 859 } 860 861 862 static lzma_ret 863 decode_block_header(struct lzma_stream_coder *coder, 864 const lzma_allocator *allocator, const uint8_t *restrict in, 865 size_t *restrict in_pos, size_t in_size) 866 { 867 if (*in_pos >= in_size) 868 return LZMA_OK; 869 870 if (coder->pos == 0) { 871 // Detect if it's Index. 872 if (in[*in_pos] == INDEX_INDICATOR) 873 return LZMA_INDEX_DETECTED; 874 875 // Calculate the size of the Block Header. Note that 876 // Block Header decoder wants to see this byte too 877 // so don't advance *in_pos. 878 coder->block_options.header_size 879 = lzma_block_header_size_decode( 880 in[*in_pos]); 881 } 882 883 // Copy the Block Header to the internal buffer. 884 lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, 885 coder->block_options.header_size); 886 887 // Return if we didn't get the whole Block Header yet. 888 if (coder->pos < coder->block_options.header_size) 889 return LZMA_OK; 890 891 coder->pos = 0; 892 893 // Version 1 is needed to support the .ignore_check option. 894 coder->block_options.version = 1; 895 896 // Block Header decoder will initialize all members of this array 897 // so we don't need to do it here. 898 coder->block_options.filters = coder->filters; 899 900 // Decode the Block Header. 901 return_if_error(lzma_block_header_decode(&coder->block_options, 902 allocator, coder->buffer)); 903 904 // If LZMA_IGNORE_CHECK was used, this flag needs to be set. 905 // It has to be set after lzma_block_header_decode() because 906 // it always resets this to false. 907 coder->block_options.ignore_check = coder->ignore_check; 908 909 // coder->block_options is ready now. 910 return LZMA_STREAM_END; 911 } 912 913 914 /// Get the size of the Compressed Data + Block Padding + Check. 915 static size_t 916 comp_blk_size(const struct lzma_stream_coder *coder) 917 { 918 return vli_ceil4(coder->block_options.compressed_size) 919 + lzma_check_size(coder->stream_flags.check); 920 } 921 922 923 /// Returns true if the size (compressed or uncompressed) is such that 924 /// threaded decompression cannot be used. Sizes that are too big compared 925 /// to SIZE_MAX must be rejected to avoid integer overflows and truncations 926 /// when lzma_vli is assigned to a size_t. 927 static bool 928 is_direct_mode_needed(lzma_vli size) 929 { 930 return size == LZMA_VLI_UNKNOWN || size > SIZE_MAX / 3; 931 } 932 933 934 static lzma_ret 935 stream_decoder_reset(struct lzma_stream_coder *coder, 936 const lzma_allocator *allocator) 937 { 938 // Initialize the Index hash used to verify the Index. 939 coder->index_hash = lzma_index_hash_init(coder->index_hash, allocator); 940 if (coder->index_hash == NULL) 941 return LZMA_MEM_ERROR; 942 943 // Reset the rest of the variables. 944 coder->sequence = SEQ_STREAM_HEADER; 945 coder->pos = 0; 946 947 return LZMA_OK; 948 } 949 950 951 static lzma_ret 952 stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, 953 const uint8_t *restrict in, size_t *restrict in_pos, 954 size_t in_size, 955 uint8_t *restrict out, size_t *restrict out_pos, 956 size_t out_size, lzma_action action) 957 { 958 struct lzma_stream_coder *coder = coder_ptr; 959 960 mythread_condtime wait_abs; 961 bool has_blocked = false; 962 963 // Determine if in SEQ_BLOCK_HEADER and SEQ_BLOCK_THR_RUN we should 964 // tell read_output_and_wait() to wait until it can fill the output 965 // buffer (or a timeout occurs). Two conditions must be met: 966 // 967 // (1) If the caller provided no new input. The reason for this 968 // can be, for example, the end of the file or that there is 969 // a pause in the input stream and more input is available 970 // a little later. In this situation we should wait for output 971 // because otherwise we would end up in a busy-waiting loop where 972 // we make no progress and the application just calls us again 973 // without providing any new input. This would then result in 974 // LZMA_BUF_ERROR even though more output would be available 975 // once the worker threads decode more data. 976 // 977 // (2) Even if (1) is true, we will not wait if the previous call to 978 // this function managed to produce some output and the output 979 // buffer became full. This is for compatibility with applications 980 // that call lzma_code() in such a way that new input is provided 981 // only when the output buffer didn't become full. Without this 982 // trick such applications would have bad performance (bad 983 // parallelization due to decoder not getting input fast enough). 984 // 985 // NOTE: Such loops might require that timeout is disabled (0) 986 // if they assume that output-not-full implies that all input has 987 // been consumed. If and only if timeout is enabled, we may return 988 // when output isn't full *and* not all input has been consumed. 989 // 990 // However, if LZMA_FINISH is used, the above is ignored and we always 991 // wait (timeout can still cause us to return) because we know that 992 // we won't get any more input. This matters if the input file is 993 // truncated and we are doing single-shot decoding, that is, 994 // timeout = 0 and LZMA_FINISH is used on the first call to 995 // lzma_code() and the output buffer is known to be big enough 996 // to hold all uncompressed data: 997 // 998 // - If LZMA_FINISH wasn't handled specially, we could return 999 // LZMA_OK before providing all output that is possible with the 1000 // truncated input. The rest would be available if lzma_code() was 1001 // called again but then it's not single-shot decoding anymore. 1002 // 1003 // - By handling LZMA_FINISH specially here, the first call will 1004 // produce all the output, matching the behavior of the 1005 // single-threaded decoder. 1006 // 1007 // So it's a very specific corner case but also easy to avoid. Note 1008 // that this special handling of LZMA_FINISH has no effect for 1009 // single-shot decoding when the input file is valid (not truncated); 1010 // premature LZMA_OK wouldn't be possible as long as timeout = 0. 1011 const bool waiting_allowed = action == LZMA_FINISH 1012 || (*in_pos == in_size && !coder->out_was_filled); 1013 coder->out_was_filled = false; 1014 1015 while (true) 1016 switch (coder->sequence) { 1017 case SEQ_STREAM_HEADER: { 1018 // Copy the Stream Header to the internal buffer. 1019 const size_t in_old = *in_pos; 1020 lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, 1021 LZMA_STREAM_HEADER_SIZE); 1022 coder->progress_in += *in_pos - in_old; 1023 1024 // Return if we didn't get the whole Stream Header yet. 1025 if (coder->pos < LZMA_STREAM_HEADER_SIZE) 1026 return LZMA_OK; 1027 1028 coder->pos = 0; 1029 1030 // Decode the Stream Header. 1031 const lzma_ret ret = lzma_stream_header_decode( 1032 &coder->stream_flags, coder->buffer); 1033 if (ret != LZMA_OK) 1034 return ret == LZMA_FORMAT_ERROR && !coder->first_stream 1035 ? LZMA_DATA_ERROR : ret; 1036 1037 // If we are decoding concatenated Streams, and the later 1038 // Streams have invalid Header Magic Bytes, we give 1039 // LZMA_DATA_ERROR instead of LZMA_FORMAT_ERROR. 1040 coder->first_stream = false; 1041 1042 // Copy the type of the Check so that Block Header and Block 1043 // decoders see it. 1044 coder->block_options.check = coder->stream_flags.check; 1045 1046 // Even if we return LZMA_*_CHECK below, we want 1047 // to continue from Block Header decoding. 1048 coder->sequence = SEQ_BLOCK_HEADER; 1049 1050 // Detect if there's no integrity check or if it is 1051 // unsupported if those were requested by the application. 1052 if (coder->tell_no_check && coder->stream_flags.check 1053 == LZMA_CHECK_NONE) 1054 return LZMA_NO_CHECK; 1055 1056 if (coder->tell_unsupported_check 1057 && !lzma_check_is_supported( 1058 coder->stream_flags.check)) 1059 return LZMA_UNSUPPORTED_CHECK; 1060 1061 if (coder->tell_any_check) 1062 return LZMA_GET_CHECK; 1063 1064 FALLTHROUGH; 1065 } 1066 1067 case SEQ_BLOCK_HEADER: { 1068 const size_t in_old = *in_pos; 1069 const lzma_ret ret = decode_block_header(coder, allocator, 1070 in, in_pos, in_size); 1071 coder->progress_in += *in_pos - in_old; 1072 1073 if (ret == LZMA_OK) { 1074 // We didn't decode the whole Block Header yet. 1075 // 1076 // Read output from the queue before returning. This 1077 // is important because it is possible that the 1078 // application doesn't have any new input available 1079 // immediately. If we didn't try to copy output from 1080 // the output queue here, lzma_code() could end up 1081 // returning LZMA_BUF_ERROR even though queued output 1082 // is available. 1083 // 1084 // If the lzma_code() call provided at least one input 1085 // byte, only copy as much data from the output queue 1086 // as is available immediately. This way the 1087 // application will be able to provide more input 1088 // without a delay. 1089 // 1090 // On the other hand, if lzma_code() was called with 1091 // an empty input buffer(*), treat it specially: try 1092 // to fill the output buffer even if it requires 1093 // waiting for the worker threads to provide output 1094 // (timeout, if specified, can still cause us to 1095 // return). 1096 // 1097 // - This way the application will be able to get all 1098 // data that can be decoded from the input provided 1099 // so far. 1100 // 1101 // - We avoid both premature LZMA_BUF_ERROR and 1102 // busy-waiting where the application repeatedly 1103 // calls lzma_code() which immediately returns 1104 // LZMA_OK without providing new data. 1105 // 1106 // - If the queue becomes empty, we won't wait 1107 // anything and will return LZMA_OK immediately 1108 // (coder->timeout is completely ignored). 1109 // 1110 // (*) See the comment at the beginning of this 1111 // function how waiting_allowed is determined 1112 // and why there is an exception to the rule 1113 // of "called with an empty input buffer". 1114 assert(*in_pos == in_size); 1115 1116 // If LZMA_FINISH was used we know that we won't get 1117 // more input, so the file must be truncated if we 1118 // get here. If worker threads don't detect any 1119 // errors, eventually there will be no more output 1120 // while we keep returning LZMA_OK which gets 1121 // converted to LZMA_BUF_ERROR in lzma_code(). 1122 // 1123 // If fail-fast is enabled then we will return 1124 // immediately using LZMA_DATA_ERROR instead of 1125 // LZMA_OK or LZMA_BUF_ERROR. Rationale for the 1126 // error code: 1127 // 1128 // - Worker threads may have a large amount of 1129 // not-yet-decoded input data and we don't 1130 // know for sure if all data is valid. Bad 1131 // data there would result in LZMA_DATA_ERROR 1132 // when fail-fast isn't used. 1133 // 1134 // - Immediate LZMA_BUF_ERROR would be a bit weird 1135 // considering the older liblzma code. lzma_code() 1136 // even has an assertion to prevent coders from 1137 // returning LZMA_BUF_ERROR directly. 1138 // 1139 // The downside of this is that with fail-fast apps 1140 // cannot always distinguish between corrupt and 1141 // truncated files. 1142 if (action == LZMA_FINISH && coder->fail_fast) { 1143 // We won't produce any more output. Stop 1144 // the unfinished worker threads so they 1145 // won't waste CPU time. 1146 threads_stop(coder); 1147 return LZMA_DATA_ERROR; 1148 } 1149 1150 // read_output_and_wait() will call threads_stop() 1151 // if needed so with that we can use return_if_error. 1152 return_if_error(read_output_and_wait(coder, allocator, 1153 out, out_pos, out_size, 1154 NULL, waiting_allowed, 1155 &wait_abs, &has_blocked)); 1156 1157 if (coder->pending_error != LZMA_OK) { 1158 coder->sequence = SEQ_ERROR; 1159 break; 1160 } 1161 1162 return LZMA_OK; 1163 } 1164 1165 if (ret == LZMA_INDEX_DETECTED) { 1166 coder->sequence = SEQ_INDEX_WAIT_OUTPUT; 1167 break; 1168 } 1169 1170 // See if an error occurred. 1171 if (ret != LZMA_STREAM_END) { 1172 // NOTE: Here and in all other places where 1173 // pending_error is set, it may overwrite the value 1174 // (LZMA_PROG_ERROR) set by read_output_and_wait(). 1175 // That function might overwrite value set here too. 1176 // These are fine because when read_output_and_wait() 1177 // sets pending_error, it actually works as a flag 1178 // variable only ("some error has occurred") and the 1179 // actual value of pending_error is not used in 1180 // SEQ_ERROR. In such cases SEQ_ERROR will eventually 1181 // get the correct error code from the return value of 1182 // a later read_output_and_wait() call. 1183 coder->pending_error = ret; 1184 coder->sequence = SEQ_ERROR; 1185 break; 1186 } 1187 1188 // Calculate the memory usage of the filters / Block decoder. 1189 coder->mem_next_filters = lzma_raw_decoder_memusage( 1190 coder->filters); 1191 1192 if (coder->mem_next_filters == UINT64_MAX) { 1193 // One or more unknown Filter IDs. 1194 coder->pending_error = LZMA_OPTIONS_ERROR; 1195 coder->sequence = SEQ_ERROR; 1196 break; 1197 } 1198 1199 coder->sequence = SEQ_BLOCK_INIT; 1200 FALLTHROUGH; 1201 } 1202 1203 case SEQ_BLOCK_INIT: { 1204 // Check if decoding is possible at all with the current 1205 // memlimit_stop which we must never exceed. 1206 // 1207 // This needs to be the first thing in SEQ_BLOCK_INIT 1208 // to make it possible to restart decoding after increasing 1209 // memlimit_stop with lzma_memlimit_set(). 1210 if (coder->mem_next_filters > coder->memlimit_stop) { 1211 // Flush pending output before returning 1212 // LZMA_MEMLIMIT_ERROR. If the application doesn't 1213 // want to increase the limit, at least it will get 1214 // all the output possible so far. 1215 return_if_error(read_output_and_wait(coder, allocator, 1216 out, out_pos, out_size, 1217 NULL, true, &wait_abs, &has_blocked)); 1218 1219 if (!lzma_outq_is_empty(&coder->outq)) 1220 return LZMA_OK; 1221 1222 return LZMA_MEMLIMIT_ERROR; 1223 } 1224 1225 // Check if the size information is available in Block Header. 1226 // If it is, check if the sizes are small enough that we don't 1227 // need to worry *too* much about integer overflows later in 1228 // the code. If these conditions are not met, we must use the 1229 // single-threaded direct mode. 1230 if (is_direct_mode_needed(coder->block_options.compressed_size) 1231 || is_direct_mode_needed( 1232 coder->block_options.uncompressed_size)) { 1233 coder->sequence = SEQ_BLOCK_DIRECT_INIT; 1234 break; 1235 } 1236 1237 // Calculate the amount of memory needed for the input and 1238 // output buffers in threaded mode. 1239 // 1240 // These cannot overflow because we already checked that 1241 // the sizes are small enough using is_direct_mode_needed(). 1242 coder->mem_next_in = comp_blk_size(coder); 1243 const uint64_t mem_buffers = coder->mem_next_in 1244 + lzma_outq_outbuf_memusage( 1245 coder->block_options.uncompressed_size); 1246 1247 // Add the amount needed by the filters. 1248 // Avoid integer overflows. 1249 if (UINT64_MAX - mem_buffers < coder->mem_next_filters) { 1250 // Use direct mode if the memusage would overflow. 1251 // This is a theoretical case that shouldn't happen 1252 // in practice unless the input file is weird (broken 1253 // or malicious). 1254 coder->sequence = SEQ_BLOCK_DIRECT_INIT; 1255 break; 1256 } 1257 1258 // Amount of memory needed to decode this Block in 1259 // threaded mode: 1260 coder->mem_next_block = coder->mem_next_filters + mem_buffers; 1261 1262 // If this alone would exceed memlimit_threading, then we must 1263 // use the single-threaded direct mode. 1264 if (coder->mem_next_block > coder->memlimit_threading) { 1265 coder->sequence = SEQ_BLOCK_DIRECT_INIT; 1266 break; 1267 } 1268 1269 // Use the threaded mode. Free the direct mode decoder in 1270 // case it has been initialized. 1271 lzma_next_end(&coder->block_decoder, allocator); 1272 coder->mem_direct_mode = 0; 1273 1274 // Since we already know what the sizes are supposed to be, 1275 // we can already add them to the Index hash. The Block 1276 // decoder will verify the values while decoding. 1277 const lzma_ret ret = lzma_index_hash_append(coder->index_hash, 1278 lzma_block_unpadded_size( 1279 &coder->block_options), 1280 coder->block_options.uncompressed_size); 1281 if (ret != LZMA_OK) { 1282 coder->pending_error = ret; 1283 coder->sequence = SEQ_ERROR; 1284 break; 1285 } 1286 1287 coder->sequence = SEQ_BLOCK_THR_INIT; 1288 FALLTHROUGH; 1289 } 1290 1291 case SEQ_BLOCK_THR_INIT: { 1292 // We need to wait for a multiple conditions to become true 1293 // until we can initialize the Block decoder and let a worker 1294 // thread decode it: 1295 // 1296 // - Wait for the memory usage of the active threads to drop 1297 // so that starting the decoding of this Block won't make 1298 // us go over memlimit_threading. 1299 // 1300 // - Wait for at least one free output queue slot. 1301 // 1302 // - Wait for a free worker thread. 1303 // 1304 // While we wait, we must copy decompressed data to the out 1305 // buffer and catch possible decoder errors. 1306 // 1307 // read_output_and_wait() does all the above. 1308 bool block_can_start = false; 1309 1310 return_if_error(read_output_and_wait(coder, allocator, 1311 out, out_pos, out_size, 1312 &block_can_start, true, 1313 &wait_abs, &has_blocked)); 1314 1315 if (coder->pending_error != LZMA_OK) { 1316 coder->sequence = SEQ_ERROR; 1317 break; 1318 } 1319 1320 if (!block_can_start) { 1321 // It's not a timeout because return_if_error handles 1322 // it already. Output queue cannot be empty either 1323 // because in that case block_can_start would have 1324 // been true. Thus the output buffer must be full and 1325 // the queue isn't empty. 1326 assert(*out_pos == out_size); 1327 assert(!lzma_outq_is_empty(&coder->outq)); 1328 return LZMA_OK; 1329 } 1330 1331 // We know that we can start decoding this Block without 1332 // exceeding memlimit_threading. However, to stay below 1333 // memlimit_threading may require freeing some of the 1334 // cached memory. 1335 // 1336 // Get a local copy of variables that require locking the 1337 // mutex. It is fine if the worker threads modify the real 1338 // values after we read these as those changes can only be 1339 // towards more favorable conditions (less memory in use, 1340 // more in cache). 1341 // 1342 // These are initialized to silence warnings. 1343 uint64_t mem_in_use = 0; 1344 uint64_t mem_cached = 0; 1345 struct worker_thread *thr = NULL; 1346 1347 mythread_sync(coder->mutex) { 1348 mem_in_use = coder->mem_in_use; 1349 mem_cached = coder->mem_cached; 1350 thr = coder->threads_free; 1351 } 1352 1353 // The maximum amount of memory that can be held by other 1354 // threads and cached buffers while allowing us to start 1355 // decoding the next Block. 1356 const uint64_t mem_max = coder->memlimit_threading 1357 - coder->mem_next_block; 1358 1359 // If the existing allocations are so large that starting 1360 // to decode this Block might exceed memlimit_threads, 1361 // try to free memory from the output queue cache first. 1362 // 1363 // NOTE: This math assumes the worst case. It's possible 1364 // that the limit wouldn't be exceeded if the existing cached 1365 // allocations are reused. 1366 if (mem_in_use + mem_cached + coder->outq.mem_allocated 1367 > mem_max) { 1368 // Clear the outq cache except leave one buffer in 1369 // the cache if its size is correct. That way we 1370 // don't free and almost immediately reallocate 1371 // an identical buffer. 1372 lzma_outq_clear_cache2(&coder->outq, allocator, 1373 coder->block_options.uncompressed_size); 1374 } 1375 1376 // If there is at least one worker_thread in the cache and 1377 // the existing allocations are so large that starting to 1378 // decode this Block might exceed memlimit_threads, free 1379 // memory by freeing cached Block decoders. 1380 // 1381 // NOTE: The comparison is different here than above. 1382 // Here we don't care about cached buffers in outq anymore 1383 // and only look at memory actually in use. This is because 1384 // if there is something in outq cache, it's a single buffer 1385 // that can be used as is. We ensured this in the above 1386 // if-block. 1387 uint64_t mem_freed = 0; 1388 if (thr != NULL && mem_in_use + mem_cached 1389 + coder->outq.mem_in_use > mem_max) { 1390 // Don't free the first Block decoder if its memory 1391 // usage isn't greater than what this Block will need. 1392 // Typically the same filter chain is used for all 1393 // Blocks so this way the allocations can be reused 1394 // when get_thread() picks the first worker_thread 1395 // from the cache. 1396 if (thr->mem_filters <= coder->mem_next_filters) 1397 thr = thr->next; 1398 1399 while (thr != NULL) { 1400 lzma_next_end(&thr->block_decoder, allocator); 1401 mem_freed += thr->mem_filters; 1402 thr->mem_filters = 0; 1403 thr = thr->next; 1404 } 1405 } 1406 1407 // Update the memory usage counters. Note that coder->mem_* 1408 // may have changed since we read them so we must subtract 1409 // or add the changes. 1410 mythread_sync(coder->mutex) { 1411 coder->mem_cached -= mem_freed; 1412 1413 // Memory needed for the filters and the input buffer. 1414 // The output queue takes care of its own counter so 1415 // we don't touch it here. 1416 // 1417 // NOTE: After this, coder->mem_in_use + 1418 // coder->mem_cached might count the same thing twice. 1419 // If so, this will get corrected in get_thread() when 1420 // a worker_thread is picked from coder->free_threads 1421 // and its memory usage is subtracted from mem_cached. 1422 coder->mem_in_use += coder->mem_next_in 1423 + coder->mem_next_filters; 1424 } 1425 1426 // Allocate memory for the output buffer in the output queue. 1427 lzma_ret ret = lzma_outq_prealloc_buf( 1428 &coder->outq, allocator, 1429 coder->block_options.uncompressed_size); 1430 if (ret != LZMA_OK) { 1431 threads_stop(coder); 1432 return ret; 1433 } 1434 1435 // Set up coder->thr. 1436 ret = get_thread(coder, allocator); 1437 if (ret != LZMA_OK) { 1438 threads_stop(coder); 1439 return ret; 1440 } 1441 1442 // The new Block decoder memory usage is already counted in 1443 // coder->mem_in_use. Store it in the thread too. 1444 coder->thr->mem_filters = coder->mem_next_filters; 1445 1446 // Initialize the Block decoder. 1447 coder->thr->block_options = coder->block_options; 1448 ret = lzma_block_decoder_init( 1449 &coder->thr->block_decoder, allocator, 1450 &coder->thr->block_options); 1451 1452 // Free the allocated filter options since they are needed 1453 // only to initialize the Block decoder. 1454 lzma_filters_free(coder->filters, allocator); 1455 coder->thr->block_options.filters = NULL; 1456 1457 // Check if memory usage calculation and Block encoder 1458 // initialization succeeded. 1459 if (ret != LZMA_OK) { 1460 coder->pending_error = ret; 1461 coder->sequence = SEQ_ERROR; 1462 break; 1463 } 1464 1465 // Allocate the input buffer. 1466 coder->thr->in_size = coder->mem_next_in; 1467 coder->thr->in = lzma_alloc(coder->thr->in_size, allocator); 1468 if (coder->thr->in == NULL) { 1469 threads_stop(coder); 1470 return LZMA_MEM_ERROR; 1471 } 1472 1473 // Get the preallocated output buffer. 1474 coder->thr->outbuf = lzma_outq_get_buf( 1475 &coder->outq, coder->thr); 1476 1477 // Start the decoder. 1478 mythread_sync(coder->thr->mutex) { 1479 assert(coder->thr->state == THR_IDLE); 1480 coder->thr->state = THR_RUN; 1481 mythread_cond_signal(&coder->thr->cond); 1482 } 1483 1484 // Enable output from the thread that holds the oldest output 1485 // buffer in the output queue (if such a thread exists). 1486 mythread_sync(coder->mutex) { 1487 lzma_outq_enable_partial_output(&coder->outq, 1488 &worker_enable_partial_update); 1489 } 1490 1491 coder->sequence = SEQ_BLOCK_THR_RUN; 1492 FALLTHROUGH; 1493 } 1494 1495 case SEQ_BLOCK_THR_RUN: { 1496 if (action == LZMA_FINISH && coder->fail_fast) { 1497 // We know that we won't get more input and that 1498 // the caller wants fail-fast behavior. If we see 1499 // that we don't have enough input to finish this 1500 // Block, return LZMA_DATA_ERROR immediately. 1501 // See SEQ_BLOCK_HEADER for the error code rationale. 1502 const size_t in_avail = in_size - *in_pos; 1503 const size_t in_needed = coder->thr->in_size 1504 - coder->thr->in_filled; 1505 if (in_avail < in_needed) { 1506 threads_stop(coder); 1507 return LZMA_DATA_ERROR; 1508 } 1509 } 1510 1511 // Copy input to the worker thread. 1512 size_t cur_in_filled = coder->thr->in_filled; 1513 lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 1514 &cur_in_filled, coder->thr->in_size); 1515 1516 // Tell the thread how much we copied. 1517 mythread_sync(coder->thr->mutex) { 1518 coder->thr->in_filled = cur_in_filled; 1519 1520 // NOTE: Most of the time we are copying input faster 1521 // than the thread can decode so most of the time 1522 // calling mythread_cond_signal() is useless but 1523 // we cannot make it conditional because thr->in_pos 1524 // is updated without a mutex. And the overhead should 1525 // be very much negligible anyway. 1526 mythread_cond_signal(&coder->thr->cond); 1527 } 1528 1529 // Read output from the output queue. Just like in 1530 // SEQ_BLOCK_HEADER, we wait to fill the output buffer 1531 // only if waiting_allowed was set to true in the beginning 1532 // of this function (see the comment there) and there is 1533 // no input available. In SEQ_BLOCK_HEADER, there is never 1534 // input available when read_output_and_wait() is called, 1535 // but here there can be when LZMA_FINISH is used, thus we 1536 // need to check if *in_pos == in_size. Otherwise we would 1537 // wait here instead of using the available input to start 1538 // a new thread. 1539 return_if_error(read_output_and_wait(coder, allocator, 1540 out, out_pos, out_size, 1541 NULL, 1542 waiting_allowed && *in_pos == in_size, 1543 &wait_abs, &has_blocked)); 1544 1545 if (coder->pending_error != LZMA_OK) { 1546 coder->sequence = SEQ_ERROR; 1547 break; 1548 } 1549 1550 // Return if the input didn't contain the whole Block. 1551 // 1552 // NOTE: When we updated coder->thr->in_filled a few lines 1553 // above, the worker thread might by now have finished its 1554 // work and returned itself back to the stack of free threads. 1555 if (coder->thr->in_filled < coder->thr->in_size) { 1556 assert(*in_pos == in_size); 1557 return LZMA_OK; 1558 } 1559 1560 // The whole Block has been copied to the thread-specific 1561 // buffer. Continue from the next Block Header or Index. 1562 coder->thr = NULL; 1563 coder->sequence = SEQ_BLOCK_HEADER; 1564 break; 1565 } 1566 1567 case SEQ_BLOCK_DIRECT_INIT: { 1568 // Wait for the threads to finish and that all decoded data 1569 // has been copied to the output. That is, wait until the 1570 // output queue becomes empty. 1571 // 1572 // NOTE: No need to check for coder->pending_error as 1573 // we aren't consuming any input until the queue is empty 1574 // and if there is a pending error, read_output_and_wait() 1575 // will eventually return it before the queue is empty. 1576 return_if_error(read_output_and_wait(coder, allocator, 1577 out, out_pos, out_size, 1578 NULL, true, &wait_abs, &has_blocked)); 1579 if (!lzma_outq_is_empty(&coder->outq)) 1580 return LZMA_OK; 1581 1582 // Free the cached output buffers. 1583 lzma_outq_clear_cache(&coder->outq, allocator); 1584 1585 // Get rid of the worker threads, including the coder->threads 1586 // array. 1587 threads_end(coder, allocator); 1588 1589 // Initialize the Block decoder. 1590 const lzma_ret ret = lzma_block_decoder_init( 1591 &coder->block_decoder, allocator, 1592 &coder->block_options); 1593 1594 // Free the allocated filter options since they are needed 1595 // only to initialize the Block decoder. 1596 lzma_filters_free(coder->filters, allocator); 1597 coder->block_options.filters = NULL; 1598 1599 // Check if Block decoder initialization succeeded. 1600 if (ret != LZMA_OK) 1601 return ret; 1602 1603 // Make the memory usage visible to _memconfig(). 1604 coder->mem_direct_mode = coder->mem_next_filters; 1605 1606 coder->sequence = SEQ_BLOCK_DIRECT_RUN; 1607 FALLTHROUGH; 1608 } 1609 1610 case SEQ_BLOCK_DIRECT_RUN: { 1611 const size_t in_old = *in_pos; 1612 const size_t out_old = *out_pos; 1613 const lzma_ret ret = coder->block_decoder.code( 1614 coder->block_decoder.coder, allocator, 1615 in, in_pos, in_size, out, out_pos, out_size, 1616 action); 1617 coder->progress_in += *in_pos - in_old; 1618 coder->progress_out += *out_pos - out_old; 1619 1620 if (ret != LZMA_STREAM_END) 1621 return ret; 1622 1623 // Block decoded successfully. Add the new size pair to 1624 // the Index hash. 1625 return_if_error(lzma_index_hash_append(coder->index_hash, 1626 lzma_block_unpadded_size( 1627 &coder->block_options), 1628 coder->block_options.uncompressed_size)); 1629 1630 coder->sequence = SEQ_BLOCK_HEADER; 1631 break; 1632 } 1633 1634 case SEQ_INDEX_WAIT_OUTPUT: 1635 // Flush the output from all worker threads so that we can 1636 // decode the Index without thinking about threading. 1637 return_if_error(read_output_and_wait(coder, allocator, 1638 out, out_pos, out_size, 1639 NULL, true, &wait_abs, &has_blocked)); 1640 1641 if (!lzma_outq_is_empty(&coder->outq)) 1642 return LZMA_OK; 1643 1644 coder->sequence = SEQ_INDEX_DECODE; 1645 FALLTHROUGH; 1646 1647 case SEQ_INDEX_DECODE: { 1648 // If we don't have any input, don't call 1649 // lzma_index_hash_decode() since it would return 1650 // LZMA_BUF_ERROR, which we must not do here. 1651 if (*in_pos >= in_size) 1652 return LZMA_OK; 1653 1654 // Decode the Index and compare it to the hash calculated 1655 // from the sizes of the Blocks (if any). 1656 const size_t in_old = *in_pos; 1657 const lzma_ret ret = lzma_index_hash_decode(coder->index_hash, 1658 in, in_pos, in_size); 1659 coder->progress_in += *in_pos - in_old; 1660 if (ret != LZMA_STREAM_END) 1661 return ret; 1662 1663 coder->sequence = SEQ_STREAM_FOOTER; 1664 FALLTHROUGH; 1665 } 1666 1667 case SEQ_STREAM_FOOTER: { 1668 // Copy the Stream Footer to the internal buffer. 1669 const size_t in_old = *in_pos; 1670 lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, 1671 LZMA_STREAM_HEADER_SIZE); 1672 coder->progress_in += *in_pos - in_old; 1673 1674 // Return if we didn't get the whole Stream Footer yet. 1675 if (coder->pos < LZMA_STREAM_HEADER_SIZE) 1676 return LZMA_OK; 1677 1678 coder->pos = 0; 1679 1680 // Decode the Stream Footer. The decoder gives 1681 // LZMA_FORMAT_ERROR if the magic bytes don't match, 1682 // so convert that return code to LZMA_DATA_ERROR. 1683 lzma_stream_flags footer_flags; 1684 const lzma_ret ret = lzma_stream_footer_decode( 1685 &footer_flags, coder->buffer); 1686 if (ret != LZMA_OK) 1687 return ret == LZMA_FORMAT_ERROR 1688 ? LZMA_DATA_ERROR : ret; 1689 1690 // Check that Index Size stored in the Stream Footer matches 1691 // the real size of the Index field. 1692 if (lzma_index_hash_size(coder->index_hash) 1693 != footer_flags.backward_size) 1694 return LZMA_DATA_ERROR; 1695 1696 // Compare that the Stream Flags fields are identical in 1697 // both Stream Header and Stream Footer. 1698 return_if_error(lzma_stream_flags_compare( 1699 &coder->stream_flags, &footer_flags)); 1700 1701 if (!coder->concatenated) 1702 return LZMA_STREAM_END; 1703 1704 coder->sequence = SEQ_STREAM_PADDING; 1705 FALLTHROUGH; 1706 } 1707 1708 case SEQ_STREAM_PADDING: 1709 assert(coder->concatenated); 1710 1711 // Skip over possible Stream Padding. 1712 while (true) { 1713 if (*in_pos >= in_size) { 1714 // Unless LZMA_FINISH was used, we cannot 1715 // know if there's more input coming later. 1716 if (action != LZMA_FINISH) 1717 return LZMA_OK; 1718 1719 // Stream Padding must be a multiple of 1720 // four bytes. 1721 return coder->pos == 0 1722 ? LZMA_STREAM_END 1723 : LZMA_DATA_ERROR; 1724 } 1725 1726 // If the byte is not zero, it probably indicates 1727 // beginning of a new Stream (or the file is corrupt). 1728 if (in[*in_pos] != 0x00) 1729 break; 1730 1731 ++*in_pos; 1732 ++coder->progress_in; 1733 coder->pos = (coder->pos + 1) & 3; 1734 } 1735 1736 // Stream Padding must be a multiple of four bytes (empty 1737 // Stream Padding is OK). 1738 if (coder->pos != 0) { 1739 ++*in_pos; 1740 ++coder->progress_in; 1741 return LZMA_DATA_ERROR; 1742 } 1743 1744 // Prepare to decode the next Stream. 1745 return_if_error(stream_decoder_reset(coder, allocator)); 1746 break; 1747 1748 case SEQ_ERROR: 1749 if (!coder->fail_fast) { 1750 // Let the application get all data before the point 1751 // where the error was detected. This matches the 1752 // behavior of single-threaded use. 1753 // 1754 // FIXME? Some errors (LZMA_MEM_ERROR) don't get here, 1755 // they are returned immediately. Thus in rare cases 1756 // the output will be less than in the single-threaded 1757 // mode. Maybe this doesn't matter much in practice. 1758 return_if_error(read_output_and_wait(coder, allocator, 1759 out, out_pos, out_size, 1760 NULL, true, &wait_abs, &has_blocked)); 1761 1762 // We get here only if the error happened in the main 1763 // thread, for example, unsupported Block Header. 1764 if (!lzma_outq_is_empty(&coder->outq)) 1765 return LZMA_OK; 1766 } 1767 1768 // We only get here if no errors were detected by the worker 1769 // threads. Errors from worker threads would have already been 1770 // returned by the call to read_output_and_wait() above. 1771 return coder->pending_error; 1772 1773 default: 1774 assert(0); 1775 return LZMA_PROG_ERROR; 1776 } 1777 1778 // Never reached 1779 } 1780 1781 1782 static void 1783 stream_decoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) 1784 { 1785 struct lzma_stream_coder *coder = coder_ptr; 1786 1787 threads_end(coder, allocator); 1788 lzma_outq_end(&coder->outq, allocator); 1789 1790 lzma_next_end(&coder->block_decoder, allocator); 1791 lzma_filters_free(coder->filters, allocator); 1792 lzma_index_hash_end(coder->index_hash, allocator); 1793 1794 lzma_free(coder, allocator); 1795 return; 1796 } 1797 1798 1799 static lzma_check 1800 stream_decoder_mt_get_check(const void *coder_ptr) 1801 { 1802 const struct lzma_stream_coder *coder = coder_ptr; 1803 return coder->stream_flags.check; 1804 } 1805 1806 1807 static lzma_ret 1808 stream_decoder_mt_memconfig(void *coder_ptr, uint64_t *memusage, 1809 uint64_t *old_memlimit, uint64_t new_memlimit) 1810 { 1811 // NOTE: This function gets/sets memlimit_stop. For now, 1812 // memlimit_threading cannot be modified after initialization. 1813 // 1814 // *memusage will include cached memory too. Excluding cached memory 1815 // would be misleading and it wouldn't help the applications to 1816 // know how much memory is actually needed to decompress the file 1817 // because the higher the number of threads and the memlimits are 1818 // the more memory the decoder may use. 1819 // 1820 // Setting a new limit includes the cached memory too and too low 1821 // limits will be rejected. Alternative could be to free the cached 1822 // memory immediately if that helps to bring the limit down but 1823 // the current way is the simplest. It's unlikely that limit needs 1824 // to be lowered in the middle of a file anyway; the typical reason 1825 // to want a new limit is to increase after LZMA_MEMLIMIT_ERROR 1826 // and even such use isn't common. 1827 struct lzma_stream_coder *coder = coder_ptr; 1828 1829 mythread_sync(coder->mutex) { 1830 *memusage = coder->mem_direct_mode 1831 + coder->mem_in_use 1832 + coder->mem_cached 1833 + coder->outq.mem_allocated; 1834 } 1835 1836 // If no filter chains are allocated, *memusage may be zero. 1837 // Always return at least LZMA_MEMUSAGE_BASE. 1838 if (*memusage < LZMA_MEMUSAGE_BASE) 1839 *memusage = LZMA_MEMUSAGE_BASE; 1840 1841 *old_memlimit = coder->memlimit_stop; 1842 1843 if (new_memlimit != 0) { 1844 if (new_memlimit < *memusage) 1845 return LZMA_MEMLIMIT_ERROR; 1846 1847 coder->memlimit_stop = new_memlimit; 1848 } 1849 1850 return LZMA_OK; 1851 } 1852 1853 1854 static void 1855 stream_decoder_mt_get_progress(void *coder_ptr, 1856 uint64_t *progress_in, uint64_t *progress_out) 1857 { 1858 struct lzma_stream_coder *coder = coder_ptr; 1859 1860 // Lock coder->mutex to prevent finishing threads from moving their 1861 // progress info from the worker_thread structure to lzma_stream_coder. 1862 mythread_sync(coder->mutex) { 1863 *progress_in = coder->progress_in; 1864 *progress_out = coder->progress_out; 1865 1866 for (size_t i = 0; i < coder->threads_initialized; ++i) { 1867 mythread_sync(coder->threads[i].mutex) { 1868 *progress_in += coder->threads[i].progress_in; 1869 *progress_out += coder->threads[i] 1870 .progress_out; 1871 } 1872 } 1873 } 1874 1875 return; 1876 } 1877 1878 1879 static lzma_ret 1880 stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 1881 const lzma_mt *options) 1882 { 1883 struct lzma_stream_coder *coder; 1884 1885 if (options->threads == 0 || options->threads > LZMA_THREADS_MAX) 1886 return LZMA_OPTIONS_ERROR; 1887 1888 if (options->flags & ~LZMA_SUPPORTED_FLAGS) 1889 return LZMA_OPTIONS_ERROR; 1890 1891 lzma_next_coder_init(&stream_decoder_mt_init, next, allocator); 1892 1893 coder = next->coder; 1894 if (!coder) { 1895 coder = lzma_alloc(sizeof(struct lzma_stream_coder), allocator); 1896 if (coder == NULL) 1897 return LZMA_MEM_ERROR; 1898 1899 next->coder = coder; 1900 1901 if (mythread_mutex_init(&coder->mutex)) { 1902 lzma_free(coder, allocator); 1903 return LZMA_MEM_ERROR; 1904 } 1905 1906 if (mythread_cond_init(&coder->cond)) { 1907 mythread_mutex_destroy(&coder->mutex); 1908 lzma_free(coder, allocator); 1909 return LZMA_MEM_ERROR; 1910 } 1911 1912 next->code = &stream_decode_mt; 1913 next->end = &stream_decoder_mt_end; 1914 next->get_check = &stream_decoder_mt_get_check; 1915 next->memconfig = &stream_decoder_mt_memconfig; 1916 next->get_progress = &stream_decoder_mt_get_progress; 1917 1918 coder->filters[0].id = LZMA_VLI_UNKNOWN; 1919 memzero(&coder->outq, sizeof(coder->outq)); 1920 1921 coder->block_decoder = LZMA_NEXT_CODER_INIT; 1922 coder->mem_direct_mode = 0; 1923 1924 coder->index_hash = NULL; 1925 coder->threads = NULL; 1926 coder->threads_free = NULL; 1927 coder->threads_initialized = 0; 1928 } 1929 1930 // Cleanup old filter chain if one remains after unfinished decoding 1931 // of a previous Stream. 1932 lzma_filters_free(coder->filters, allocator); 1933 1934 // By allocating threads from scratch we can start memory-usage 1935 // accounting from scratch, too. Changes in filter and block sizes may 1936 // affect number of threads. 1937 // 1938 // Reusing threads doesn't seem worth it. Unlike the single-threaded 1939 // decoder, with some types of input file combinations reusing 1940 // could leave quite a lot of memory allocated but unused (first 1941 // file could allocate a lot, the next files could use fewer 1942 // threads and some of the allocations from the first file would not 1943 // get freed unless memlimit_threading forces us to clear caches). 1944 // 1945 // NOTE: The direct mode decoder isn't freed here if one exists. 1946 // It will be reused or freed as needed in the main loop. 1947 threads_end(coder, allocator); 1948 1949 // All memusage counters start at 0 (including mem_direct_mode). 1950 // The little extra that is needed for the structs in this file 1951 // get accounted well enough by the filter chain memory usage 1952 // which adds LZMA_MEMUSAGE_BASE for each chain. However, 1953 // stream_decoder_mt_memconfig() has to handle this specially so that 1954 // it will never return less than LZMA_MEMUSAGE_BASE as memory usage. 1955 coder->mem_in_use = 0; 1956 coder->mem_cached = 0; 1957 coder->mem_next_block = 0; 1958 1959 coder->progress_in = 0; 1960 coder->progress_out = 0; 1961 1962 coder->sequence = SEQ_STREAM_HEADER; 1963 coder->thread_error = LZMA_OK; 1964 coder->pending_error = LZMA_OK; 1965 coder->thr = NULL; 1966 1967 coder->timeout = options->timeout; 1968 1969 coder->memlimit_threading = my_max(1, options->memlimit_threading); 1970 coder->memlimit_stop = my_max(1, options->memlimit_stop); 1971 if (coder->memlimit_threading > coder->memlimit_stop) 1972 coder->memlimit_threading = coder->memlimit_stop; 1973 1974 coder->tell_no_check = (options->flags & LZMA_TELL_NO_CHECK) != 0; 1975 coder->tell_unsupported_check 1976 = (options->flags & LZMA_TELL_UNSUPPORTED_CHECK) != 0; 1977 coder->tell_any_check = (options->flags & LZMA_TELL_ANY_CHECK) != 0; 1978 coder->ignore_check = (options->flags & LZMA_IGNORE_CHECK) != 0; 1979 coder->concatenated = (options->flags & LZMA_CONCATENATED) != 0; 1980 coder->fail_fast = (options->flags & LZMA_FAIL_FAST) != 0; 1981 1982 coder->first_stream = true; 1983 coder->out_was_filled = false; 1984 coder->pos = 0; 1985 1986 coder->threads_max = options->threads; 1987 1988 return_if_error(lzma_outq_init(&coder->outq, allocator, 1989 coder->threads_max)); 1990 1991 return stream_decoder_reset(coder, allocator); 1992 } 1993 1994 1995 extern LZMA_API(lzma_ret) 1996 lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options) 1997 { 1998 lzma_next_strm_init(stream_decoder_mt_init, strm, options); 1999 2000 strm->internal->supported_actions[LZMA_RUN] = true; 2001 strm->internal->supported_actions[LZMA_FINISH] = true; 2002 2003 return LZMA_OK; 2004 } 2005