1 // SPDX-License-Identifier: 0BSD 2 3 /////////////////////////////////////////////////////////////////////////////// 4 // 5 /// \file stream_encoder_mt.c 6 /// \brief Multithreaded .xz Stream encoder 7 // 8 // Author: Lasse Collin 9 // 10 /////////////////////////////////////////////////////////////////////////////// 11 12 #include "filter_encoder.h" 13 #include "easy_preset.h" 14 #include "block_encoder.h" 15 #include "block_buffer_encoder.h" 16 #include "index_encoder.h" 17 #include "outqueue.h" 18 19 20 /// Maximum supported block size. This makes it simpler to prevent integer 21 /// overflows if we are given unusually large block size. 22 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 23 24 25 typedef enum { 26 /// Waiting for work. 27 THR_IDLE, 28 29 /// Encoding is in progress. 30 THR_RUN, 31 32 /// Encoding is in progress but no more input data will 33 /// be read. 34 THR_FINISH, 35 36 /// The main thread wants the thread to stop whatever it was doing 37 /// but not exit. 38 THR_STOP, 39 40 /// The main thread wants the thread to exit. We could use 41 /// cancellation but since there's stopped anyway, this is lazier. 42 THR_EXIT, 43 44 } worker_state; 45 46 typedef struct lzma_stream_coder_s lzma_stream_coder; 47 48 typedef struct worker_thread_s worker_thread; 49 struct worker_thread_s { 50 worker_state state; 51 52 /// Input buffer of coder->block_size bytes. The main thread will 53 /// put new input into this and update in_size accordingly. Once 54 /// no more input is coming, state will be set to THR_FINISH. 55 uint8_t *in; 56 57 /// Amount of data available in the input buffer. This is modified 58 /// only by the main thread. 59 size_t in_size; 60 61 /// Output buffer for this thread. This is set by the main 62 /// thread every time a new Block is started with this thread 63 /// structure. 64 lzma_outbuf *outbuf; 65 66 /// Pointer to the main structure is needed when putting this 67 /// thread back to the stack of free threads. 68 lzma_stream_coder *coder; 69 70 /// The allocator is set by the main thread. Since a copy of the 71 /// pointer is kept here, the application must not change the 72 /// allocator before calling lzma_end(). 73 const lzma_allocator *allocator; 74 75 /// Amount of uncompressed data that has already been compressed. 76 uint64_t progress_in; 77 78 /// Amount of compressed data that is ready. 79 uint64_t progress_out; 80 81 /// Block encoder 82 lzma_next_coder block_encoder; 83 84 /// Compression options for this Block 85 lzma_block block_options; 86 87 /// Filter chain for this thread. By copying the filters array 88 /// to each thread it is possible to change the filter chain 89 /// between Blocks using lzma_filters_update(). 90 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 91 92 /// Next structure in the stack of free worker threads. 93 worker_thread *next; 94 95 mythread_mutex mutex; 96 mythread_cond cond; 97 98 /// The ID of this thread is used to join the thread 99 /// when it's not needed anymore. 100 mythread thread_id; 101 }; 102 103 104 struct lzma_stream_coder_s { 105 enum { 106 SEQ_STREAM_HEADER, 107 SEQ_BLOCK, 108 SEQ_INDEX, 109 SEQ_STREAM_FOOTER, 110 } sequence; 111 112 /// Start a new Block every block_size bytes of input unless 113 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 114 size_t block_size; 115 116 /// The filter chain to use for the next Block. 117 /// This can be updated using lzma_filters_update() 118 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH. 119 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 120 121 /// A copy of filters[] will be put here when attempting to get 122 /// a new worker thread. This will be copied to a worker thread 123 /// when a thread becomes free and then this cache is marked as 124 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache 125 /// the filter options from filters[] would get uselessly copied 126 /// multiple times (allocated and freed) when waiting for a new free 127 /// worker thread. 128 /// 129 /// This is freed if filters[] is updated via lzma_filters_update(). 130 lzma_filter filters_cache[LZMA_FILTERS_MAX + 1]; 131 132 133 /// Index to hold sizes of the Blocks 134 lzma_index *index; 135 136 /// Index encoder 137 lzma_next_coder index_encoder; 138 139 140 /// Stream Flags for encoding the Stream Header and Stream Footer. 141 lzma_stream_flags stream_flags; 142 143 /// Buffer to hold Stream Header and Stream Footer. 144 uint8_t header[LZMA_STREAM_HEADER_SIZE]; 145 146 /// Read position in header[] 147 size_t header_pos; 148 149 150 /// Output buffer queue for compressed data 151 lzma_outq outq; 152 153 /// How much memory to allocate for each lzma_outbuf.buf 154 size_t outbuf_alloc_size; 155 156 157 /// Maximum wait time if cannot use all the input and cannot 158 /// fill the output buffer. This is in milliseconds. 159 uint32_t timeout; 160 161 162 /// Error code from a worker thread 163 lzma_ret thread_error; 164 165 /// Array of allocated thread-specific structures 166 worker_thread *threads; 167 168 /// Number of structures in "threads" above. This is also the 169 /// number of threads that will be created at maximum. 170 uint32_t threads_max; 171 172 /// Number of thread structures that have been initialized, and 173 /// thus the number of worker threads actually created so far. 174 uint32_t threads_initialized; 175 176 /// Stack of free threads. When a thread finishes, it puts itself 177 /// back into this stack. This starts as empty because threads 178 /// are created only when actually needed. 179 worker_thread *threads_free; 180 181 /// The most recent worker thread to which the main thread writes 182 /// the new input from the application. 183 worker_thread *thr; 184 185 186 /// Amount of uncompressed data in Blocks that have already 187 /// been finished. 188 uint64_t progress_in; 189 190 /// Amount of compressed data in Stream Header + Blocks that 191 /// have already been finished. 192 uint64_t progress_out; 193 194 195 mythread_mutex mutex; 196 mythread_cond cond; 197 }; 198 199 200 /// Tell the main thread that something has gone wrong. 201 static void 202 worker_error(worker_thread *thr, lzma_ret ret) 203 { 204 assert(ret != LZMA_OK); 205 assert(ret != LZMA_STREAM_END); 206 207 mythread_sync(thr->coder->mutex) { 208 if (thr->coder->thread_error == LZMA_OK) 209 thr->coder->thread_error = ret; 210 211 mythread_cond_signal(&thr->coder->cond); 212 } 213 214 return; 215 } 216 217 218 static worker_state 219 worker_encode(worker_thread *thr, size_t *out_pos, worker_state state) 220 { 221 assert(thr->progress_in == 0); 222 assert(thr->progress_out == 0); 223 224 // Set the Block options. 225 thr->block_options = (lzma_block){ 226 .version = 0, 227 .check = thr->coder->stream_flags.check, 228 .compressed_size = thr->outbuf->allocated, 229 .uncompressed_size = thr->coder->block_size, 230 .filters = thr->filters, 231 }; 232 233 // Calculate maximum size of the Block Header. This amount is 234 // reserved in the beginning of the buffer so that Block Header 235 // along with Compressed Size and Uncompressed Size can be 236 // written there. 237 lzma_ret ret = lzma_block_header_size(&thr->block_options); 238 if (ret != LZMA_OK) { 239 worker_error(thr, ret); 240 return THR_STOP; 241 } 242 243 // Initialize the Block encoder. 244 ret = lzma_block_encoder_init(&thr->block_encoder, 245 thr->allocator, &thr->block_options); 246 if (ret != LZMA_OK) { 247 worker_error(thr, ret); 248 return THR_STOP; 249 } 250 251 size_t in_pos = 0; 252 size_t in_size = 0; 253 254 *out_pos = thr->block_options.header_size; 255 const size_t out_size = thr->outbuf->allocated; 256 257 do { 258 mythread_sync(thr->mutex) { 259 // Store in_pos and *out_pos into *thr so that 260 // an application may read them via 261 // lzma_get_progress() to get progress information. 262 // 263 // NOTE: These aren't updated when the encoding 264 // finishes. Instead, the final values are taken 265 // later from thr->outbuf. 266 thr->progress_in = in_pos; 267 thr->progress_out = *out_pos; 268 269 while (in_size == thr->in_size 270 && thr->state == THR_RUN) 271 mythread_cond_wait(&thr->cond, &thr->mutex); 272 273 state = thr->state; 274 in_size = thr->in_size; 275 } 276 277 // Return if we were asked to stop or exit. 278 if (state >= THR_STOP) 279 return state; 280 281 lzma_action action = state == THR_FINISH 282 ? LZMA_FINISH : LZMA_RUN; 283 284 // Limit the amount of input given to the Block encoder 285 // at once. This way this thread can react fairly quickly 286 // if the main thread wants us to stop or exit. 287 static const size_t in_chunk_max = 16384; 288 size_t in_limit = in_size; 289 if (in_size - in_pos > in_chunk_max) { 290 in_limit = in_pos + in_chunk_max; 291 action = LZMA_RUN; 292 } 293 294 ret = thr->block_encoder.code( 295 thr->block_encoder.coder, thr->allocator, 296 thr->in, &in_pos, in_limit, thr->outbuf->buf, 297 out_pos, out_size, action); 298 } while (ret == LZMA_OK && *out_pos < out_size); 299 300 switch (ret) { 301 case LZMA_STREAM_END: 302 assert(state == THR_FINISH); 303 304 // Encode the Block Header. By doing it after 305 // the compression, we can store the Compressed Size 306 // and Uncompressed Size fields. 307 ret = lzma_block_header_encode(&thr->block_options, 308 thr->outbuf->buf); 309 if (ret != LZMA_OK) { 310 worker_error(thr, ret); 311 return THR_STOP; 312 } 313 314 break; 315 316 case LZMA_OK: 317 // The data was incompressible. Encode it using uncompressed 318 // LZMA2 chunks. 319 // 320 // First wait that we have gotten all the input. 321 mythread_sync(thr->mutex) { 322 while (thr->state == THR_RUN) 323 mythread_cond_wait(&thr->cond, &thr->mutex); 324 325 state = thr->state; 326 in_size = thr->in_size; 327 } 328 329 if (state >= THR_STOP) 330 return state; 331 332 // Do the encoding. This takes care of the Block Header too. 333 *out_pos = 0; 334 ret = lzma_block_uncomp_encode(&thr->block_options, 335 thr->in, in_size, thr->outbuf->buf, 336 out_pos, out_size); 337 338 // It shouldn't fail. 339 if (ret != LZMA_OK) { 340 worker_error(thr, LZMA_PROG_ERROR); 341 return THR_STOP; 342 } 343 344 break; 345 346 default: 347 worker_error(thr, ret); 348 return THR_STOP; 349 } 350 351 // Set the size information that will be read by the main thread 352 // to write the Index field. 353 thr->outbuf->unpadded_size 354 = lzma_block_unpadded_size(&thr->block_options); 355 assert(thr->outbuf->unpadded_size != 0); 356 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 357 358 return THR_FINISH; 359 } 360 361 362 static MYTHREAD_RET_TYPE 363 worker_start(void *thr_ptr) 364 { 365 worker_thread *thr = thr_ptr; 366 worker_state state = THR_IDLE; // Init to silence a warning 367 368 while (true) { 369 // Wait for work. 370 mythread_sync(thr->mutex) { 371 while (true) { 372 // The thread is already idle so if we are 373 // requested to stop, just set the state. 374 if (thr->state == THR_STOP) { 375 thr->state = THR_IDLE; 376 mythread_cond_signal(&thr->cond); 377 } 378 379 state = thr->state; 380 if (state != THR_IDLE) 381 break; 382 383 mythread_cond_wait(&thr->cond, &thr->mutex); 384 } 385 } 386 387 size_t out_pos = 0; 388 389 assert(state != THR_IDLE); 390 assert(state != THR_STOP); 391 392 if (state <= THR_FINISH) 393 state = worker_encode(thr, &out_pos, state); 394 395 if (state == THR_EXIT) 396 break; 397 398 // Mark the thread as idle unless the main thread has 399 // told us to exit. Signal is needed for the case 400 // where the main thread is waiting for the threads to stop. 401 mythread_sync(thr->mutex) { 402 if (thr->state != THR_EXIT) { 403 thr->state = THR_IDLE; 404 mythread_cond_signal(&thr->cond); 405 } 406 } 407 408 mythread_sync(thr->coder->mutex) { 409 // If no errors occurred, make the encoded data 410 // available to be copied out. 411 if (state == THR_FINISH) { 412 thr->outbuf->pos = out_pos; 413 thr->outbuf->finished = true; 414 } 415 416 // Update the main progress info. 417 thr->coder->progress_in 418 += thr->outbuf->uncompressed_size; 419 thr->coder->progress_out += out_pos; 420 thr->progress_in = 0; 421 thr->progress_out = 0; 422 423 // Return this thread to the stack of free threads. 424 thr->next = thr->coder->threads_free; 425 thr->coder->threads_free = thr; 426 427 mythread_cond_signal(&thr->coder->cond); 428 } 429 } 430 431 // Exiting, free the resources. 432 lzma_filters_free(thr->filters, thr->allocator); 433 434 mythread_mutex_destroy(&thr->mutex); 435 mythread_cond_destroy(&thr->cond); 436 437 lzma_next_end(&thr->block_encoder, thr->allocator); 438 lzma_free(thr->in, thr->allocator); 439 return MYTHREAD_RET_VALUE; 440 } 441 442 443 /// Make the threads stop but not exit. Optionally wait for them to stop. 444 static void 445 threads_stop(lzma_stream_coder *coder, bool wait_for_threads) 446 { 447 // Tell the threads to stop. 448 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 449 mythread_sync(coder->threads[i].mutex) { 450 coder->threads[i].state = THR_STOP; 451 mythread_cond_signal(&coder->threads[i].cond); 452 } 453 } 454 455 if (!wait_for_threads) 456 return; 457 458 // Wait for the threads to settle in the idle state. 459 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 460 mythread_sync(coder->threads[i].mutex) { 461 while (coder->threads[i].state != THR_IDLE) 462 mythread_cond_wait(&coder->threads[i].cond, 463 &coder->threads[i].mutex); 464 } 465 } 466 467 return; 468 } 469 470 471 /// Stop the threads and free the resources associated with them. 472 /// Wait until the threads have exited. 473 static void 474 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator) 475 { 476 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 477 mythread_sync(coder->threads[i].mutex) { 478 coder->threads[i].state = THR_EXIT; 479 mythread_cond_signal(&coder->threads[i].cond); 480 } 481 } 482 483 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 484 int ret = mythread_join(coder->threads[i].thread_id); 485 assert(ret == 0); 486 (void)ret; 487 } 488 489 lzma_free(coder->threads, allocator); 490 return; 491 } 492 493 494 /// Initialize a new worker_thread structure and create a new thread. 495 static lzma_ret 496 initialize_new_thread(lzma_stream_coder *coder, 497 const lzma_allocator *allocator) 498 { 499 worker_thread *thr = &coder->threads[coder->threads_initialized]; 500 501 thr->in = lzma_alloc(coder->block_size, allocator); 502 if (thr->in == NULL) 503 return LZMA_MEM_ERROR; 504 505 if (mythread_mutex_init(&thr->mutex)) 506 goto error_mutex; 507 508 if (mythread_cond_init(&thr->cond)) 509 goto error_cond; 510 511 thr->state = THR_IDLE; 512 thr->allocator = allocator; 513 thr->coder = coder; 514 thr->progress_in = 0; 515 thr->progress_out = 0; 516 thr->block_encoder = LZMA_NEXT_CODER_INIT; 517 thr->filters[0].id = LZMA_VLI_UNKNOWN; 518 519 if (mythread_create(&thr->thread_id, &worker_start, thr)) 520 goto error_thread; 521 522 ++coder->threads_initialized; 523 coder->thr = thr; 524 525 return LZMA_OK; 526 527 error_thread: 528 mythread_cond_destroy(&thr->cond); 529 530 error_cond: 531 mythread_mutex_destroy(&thr->mutex); 532 533 error_mutex: 534 lzma_free(thr->in, allocator); 535 return LZMA_MEM_ERROR; 536 } 537 538 539 static lzma_ret 540 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) 541 { 542 // If there are no free output subqueues, there is no 543 // point to try getting a thread. 544 if (!lzma_outq_has_buf(&coder->outq)) 545 return LZMA_OK; 546 547 // That's also true if we cannot allocate memory for the output 548 // buffer in the output queue. 549 return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, 550 coder->outbuf_alloc_size)); 551 552 // Make a thread-specific copy of the filter chain. Put it in 553 // the cache array first so that if we cannot get a new thread yet, 554 // the allocation is ready when we try again. 555 if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN) 556 return_if_error(lzma_filters_copy( 557 coder->filters, coder->filters_cache, allocator)); 558 559 // If there is a free structure on the stack, use it. 560 mythread_sync(coder->mutex) { 561 if (coder->threads_free != NULL) { 562 coder->thr = coder->threads_free; 563 coder->threads_free = coder->threads_free->next; 564 } 565 } 566 567 if (coder->thr == NULL) { 568 // If there are no uninitialized structures left, return. 569 if (coder->threads_initialized == coder->threads_max) 570 return LZMA_OK; 571 572 // Initialize a new thread. 573 return_if_error(initialize_new_thread(coder, allocator)); 574 } 575 576 // Reset the parts of the thread state that have to be done 577 // in the main thread. 578 mythread_sync(coder->thr->mutex) { 579 coder->thr->state = THR_RUN; 580 coder->thr->in_size = 0; 581 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); 582 583 // Free the old thread-specific filter options and replace 584 // them with the already-allocated new options from 585 // coder->filters_cache[]. Then mark the cache as empty. 586 lzma_filters_free(coder->thr->filters, allocator); 587 memcpy(coder->thr->filters, coder->filters_cache, 588 sizeof(coder->filters_cache)); 589 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; 590 591 mythread_cond_signal(&coder->thr->cond); 592 } 593 594 return LZMA_OK; 595 } 596 597 598 static lzma_ret 599 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, 600 const uint8_t *restrict in, size_t *restrict in_pos, 601 size_t in_size, lzma_action action) 602 { 603 while (*in_pos < in_size 604 || (coder->thr != NULL && action != LZMA_RUN)) { 605 if (coder->thr == NULL) { 606 // Get a new thread. 607 const lzma_ret ret = get_thread(coder, allocator); 608 if (coder->thr == NULL) 609 return ret; 610 } 611 612 // Copy the input data to thread's buffer. 613 size_t thr_in_size = coder->thr->in_size; 614 lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 615 &thr_in_size, coder->block_size); 616 617 // Tell the Block encoder to finish if 618 // - it has got block_size bytes of input; or 619 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 620 // or LZMA_FULL_BARRIER was used. 621 // 622 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 623 const bool finish = thr_in_size == coder->block_size 624 || (*in_pos == in_size && action != LZMA_RUN); 625 626 bool block_error = false; 627 628 mythread_sync(coder->thr->mutex) { 629 if (coder->thr->state == THR_IDLE) { 630 // Something has gone wrong with the Block 631 // encoder. It has set coder->thread_error 632 // which we will read a few lines later. 633 block_error = true; 634 } else { 635 // Tell the Block encoder its new amount 636 // of input and update the state if needed. 637 coder->thr->in_size = thr_in_size; 638 639 if (finish) 640 coder->thr->state = THR_FINISH; 641 642 mythread_cond_signal(&coder->thr->cond); 643 } 644 } 645 646 if (block_error) { 647 lzma_ret ret = LZMA_OK; // Init to silence a warning. 648 649 mythread_sync(coder->mutex) { 650 ret = coder->thread_error; 651 } 652 653 return ret; 654 } 655 656 if (finish) 657 coder->thr = NULL; 658 } 659 660 return LZMA_OK; 661 } 662 663 664 /// Wait until more input can be consumed, more output can be read, or 665 /// an optional timeout is reached. 666 static bool 667 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, 668 bool *has_blocked, bool has_input) 669 { 670 if (coder->timeout != 0 && !*has_blocked) { 671 // Every time when stream_encode_mt() is called via 672 // lzma_code(), *has_blocked starts as false. We set it 673 // to true here and calculate the absolute time when 674 // we must return if there's nothing to do. 675 // 676 // This way if we block multiple times for short moments 677 // less than "timeout" milliseconds, we will return once 678 // "timeout" amount of time has passed since the *first* 679 // blocking occurred. If the absolute time was calculated 680 // again every time we block, "timeout" would effectively 681 // be meaningless if we never consecutively block longer 682 // than "timeout" ms. 683 *has_blocked = true; 684 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 685 } 686 687 bool timed_out = false; 688 689 mythread_sync(coder->mutex) { 690 // There are four things that we wait. If one of them 691 // becomes possible, we return. 692 // - If there is input left, we need to get a free 693 // worker thread and an output buffer for it. 694 // - Data ready to be read from the output queue. 695 // - A worker thread indicates an error. 696 // - Time out occurs. 697 while ((!has_input || coder->threads_free == NULL 698 || !lzma_outq_has_buf(&coder->outq)) 699 && !lzma_outq_is_readable(&coder->outq) 700 && coder->thread_error == LZMA_OK 701 && !timed_out) { 702 if (coder->timeout != 0) 703 timed_out = mythread_cond_timedwait( 704 &coder->cond, &coder->mutex, 705 wait_abs) != 0; 706 else 707 mythread_cond_wait(&coder->cond, 708 &coder->mutex); 709 } 710 } 711 712 return timed_out; 713 } 714 715 716 static lzma_ret 717 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, 718 const uint8_t *restrict in, size_t *restrict in_pos, 719 size_t in_size, uint8_t *restrict out, 720 size_t *restrict out_pos, size_t out_size, lzma_action action) 721 { 722 lzma_stream_coder *coder = coder_ptr; 723 724 switch (coder->sequence) { 725 case SEQ_STREAM_HEADER: 726 lzma_bufcpy(coder->header, &coder->header_pos, 727 sizeof(coder->header), 728 out, out_pos, out_size); 729 if (coder->header_pos < sizeof(coder->header)) 730 return LZMA_OK; 731 732 coder->header_pos = 0; 733 coder->sequence = SEQ_BLOCK; 734 735 // Fall through 736 737 case SEQ_BLOCK: { 738 // Initialized to silence warnings. 739 lzma_vli unpadded_size = 0; 740 lzma_vli uncompressed_size = 0; 741 lzma_ret ret = LZMA_OK; 742 743 // These are for wait_for_work(). 744 bool has_blocked = false; 745 mythread_condtime wait_abs = { 0 }; 746 747 while (true) { 748 mythread_sync(coder->mutex) { 749 // Check for Block encoder errors. 750 ret = coder->thread_error; 751 if (ret != LZMA_OK) { 752 assert(ret != LZMA_STREAM_END); 753 break; // Break out of mythread_sync. 754 } 755 756 // Try to read compressed data to out[]. 757 ret = lzma_outq_read(&coder->outq, allocator, 758 out, out_pos, out_size, 759 &unpadded_size, 760 &uncompressed_size); 761 } 762 763 if (ret == LZMA_STREAM_END) { 764 // End of Block. Add it to the Index. 765 ret = lzma_index_append(coder->index, 766 allocator, unpadded_size, 767 uncompressed_size); 768 if (ret != LZMA_OK) { 769 threads_stop(coder, false); 770 return ret; 771 } 772 773 // If we didn't fill the output buffer yet, 774 // try to read more data. Maybe the next 775 // outbuf has been finished already too. 776 if (*out_pos < out_size) 777 continue; 778 } 779 780 if (ret != LZMA_OK) { 781 // coder->thread_error was set. 782 threads_stop(coder, false); 783 return ret; 784 } 785 786 // Try to give uncompressed data to a worker thread. 787 ret = stream_encode_in(coder, allocator, 788 in, in_pos, in_size, action); 789 if (ret != LZMA_OK) { 790 threads_stop(coder, false); 791 return ret; 792 } 793 794 // See if we should wait or return. 795 // 796 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 797 if (*in_pos == in_size) { 798 // LZMA_RUN: More data is probably coming 799 // so return to let the caller fill the 800 // input buffer. 801 if (action == LZMA_RUN) 802 return LZMA_OK; 803 804 // LZMA_FULL_BARRIER: The same as with 805 // LZMA_RUN but tell the caller that the 806 // barrier was completed. 807 if (action == LZMA_FULL_BARRIER) 808 return LZMA_STREAM_END; 809 810 // Finishing or flushing isn't completed until 811 // all input data has been encoded and copied 812 // to the output buffer. 813 if (lzma_outq_is_empty(&coder->outq)) { 814 // LZMA_FINISH: Continue to encode 815 // the Index field. 816 if (action == LZMA_FINISH) 817 break; 818 819 // LZMA_FULL_FLUSH: Return to tell 820 // the caller that flushing was 821 // completed. 822 if (action == LZMA_FULL_FLUSH) 823 return LZMA_STREAM_END; 824 } 825 } 826 827 // Return if there is no output space left. 828 // This check must be done after testing the input 829 // buffer, because we might want to use a different 830 // return code. 831 if (*out_pos == out_size) 832 return LZMA_OK; 833 834 // Neither in nor out has been used completely. 835 // Wait until there's something we can do. 836 if (wait_for_work(coder, &wait_abs, &has_blocked, 837 *in_pos < in_size)) 838 return LZMA_TIMED_OUT; 839 } 840 841 // All Blocks have been encoded and the threads have stopped. 842 // Prepare to encode the Index field. 843 return_if_error(lzma_index_encoder_init( 844 &coder->index_encoder, allocator, 845 coder->index)); 846 coder->sequence = SEQ_INDEX; 847 848 // Update the progress info to take the Index and 849 // Stream Footer into account. Those are very fast to encode 850 // so in terms of progress information they can be thought 851 // to be ready to be copied out. 852 coder->progress_out += lzma_index_size(coder->index) 853 + LZMA_STREAM_HEADER_SIZE; 854 } 855 856 // Fall through 857 858 case SEQ_INDEX: { 859 // Call the Index encoder. It doesn't take any input, so 860 // those pointers can be NULL. 861 const lzma_ret ret = coder->index_encoder.code( 862 coder->index_encoder.coder, allocator, 863 NULL, NULL, 0, 864 out, out_pos, out_size, LZMA_RUN); 865 if (ret != LZMA_STREAM_END) 866 return ret; 867 868 // Encode the Stream Footer into coder->buffer. 869 coder->stream_flags.backward_size 870 = lzma_index_size(coder->index); 871 if (lzma_stream_footer_encode(&coder->stream_flags, 872 coder->header) != LZMA_OK) 873 return LZMA_PROG_ERROR; 874 875 coder->sequence = SEQ_STREAM_FOOTER; 876 } 877 878 // Fall through 879 880 case SEQ_STREAM_FOOTER: 881 lzma_bufcpy(coder->header, &coder->header_pos, 882 sizeof(coder->header), 883 out, out_pos, out_size); 884 return coder->header_pos < sizeof(coder->header) 885 ? LZMA_OK : LZMA_STREAM_END; 886 } 887 888 assert(0); 889 return LZMA_PROG_ERROR; 890 } 891 892 893 static void 894 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) 895 { 896 lzma_stream_coder *coder = coder_ptr; 897 898 // Threads must be killed before the output queue can be freed. 899 threads_end(coder, allocator); 900 lzma_outq_end(&coder->outq, allocator); 901 902 lzma_filters_free(coder->filters, allocator); 903 lzma_filters_free(coder->filters_cache, allocator); 904 905 lzma_next_end(&coder->index_encoder, allocator); 906 lzma_index_end(coder->index, allocator); 907 908 mythread_cond_destroy(&coder->cond); 909 mythread_mutex_destroy(&coder->mutex); 910 911 lzma_free(coder, allocator); 912 return; 913 } 914 915 916 static lzma_ret 917 stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator, 918 const lzma_filter *filters, 919 const lzma_filter *reversed_filters 920 lzma_attribute((__unused__))) 921 { 922 lzma_stream_coder *coder = coder_ptr; 923 924 // Applications shouldn't attempt to change the options when 925 // we are already encoding the Index or Stream Footer. 926 if (coder->sequence > SEQ_BLOCK) 927 return LZMA_PROG_ERROR; 928 929 // For now the threaded encoder doesn't support changing 930 // the options in the middle of a Block. 931 if (coder->thr != NULL) 932 return LZMA_PROG_ERROR; 933 934 // Check if the filter chain seems mostly valid. See the comment 935 // in stream_encoder_mt_init(). 936 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 937 return LZMA_OPTIONS_ERROR; 938 939 // Make a copy to a temporary buffer first. This way the encoder 940 // state stays unchanged if an error occurs in lzma_filters_copy(). 941 lzma_filter temp[LZMA_FILTERS_MAX + 1]; 942 return_if_error(lzma_filters_copy(filters, temp, allocator)); 943 944 // Free the options of the old chain as well as the cache. 945 lzma_filters_free(coder->filters, allocator); 946 lzma_filters_free(coder->filters_cache, allocator); 947 948 // Copy the new filter chain in place. 949 memcpy(coder->filters, temp, sizeof(temp)); 950 951 return LZMA_OK; 952 } 953 954 955 /// Options handling for lzma_stream_encoder_mt_init() and 956 /// lzma_stream_encoder_mt_memusage() 957 static lzma_ret 958 get_options(const lzma_mt *options, lzma_options_easy *opt_easy, 959 const lzma_filter **filters, uint64_t *block_size, 960 uint64_t *outbuf_size_max) 961 { 962 // Validate some of the options. 963 if (options == NULL) 964 return LZMA_PROG_ERROR; 965 966 if (options->flags != 0 || options->threads == 0 967 || options->threads > LZMA_THREADS_MAX) 968 return LZMA_OPTIONS_ERROR; 969 970 if (options->filters != NULL) { 971 // Filter chain was given, use it as is. 972 *filters = options->filters; 973 } else { 974 // Use a preset. 975 if (lzma_easy_preset(opt_easy, options->preset)) 976 return LZMA_OPTIONS_ERROR; 977 978 *filters = opt_easy->filters; 979 } 980 981 // If the Block size is not set, determine it from the filter chain. 982 if (options->block_size > 0) 983 *block_size = options->block_size; 984 else 985 *block_size = lzma_mt_block_size(*filters); 986 987 // UINT64_MAX > BLOCK_SIZE_MAX, so the second condition 988 // should be optimized out by any reasonable compiler. 989 // The second condition should be there in the unlikely event that 990 // the macros change and UINT64_MAX < BLOCK_SIZE_MAX. 991 if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX) 992 return LZMA_OPTIONS_ERROR; 993 994 // Calculate the maximum amount output that a single output buffer 995 // may need to hold. This is the same as the maximum total size of 996 // a Block. 997 *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 998 if (*outbuf_size_max == 0) 999 return LZMA_MEM_ERROR; 1000 1001 return LZMA_OK; 1002 } 1003 1004 1005 static void 1006 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out) 1007 { 1008 lzma_stream_coder *coder = coder_ptr; 1009 1010 // Lock coder->mutex to prevent finishing threads from moving their 1011 // progress info from the worker_thread structure to lzma_stream_coder. 1012 mythread_sync(coder->mutex) { 1013 *progress_in = coder->progress_in; 1014 *progress_out = coder->progress_out; 1015 1016 for (size_t i = 0; i < coder->threads_initialized; ++i) { 1017 mythread_sync(coder->threads[i].mutex) { 1018 *progress_in += coder->threads[i].progress_in; 1019 *progress_out += coder->threads[i] 1020 .progress_out; 1021 } 1022 } 1023 } 1024 1025 return; 1026 } 1027 1028 1029 static lzma_ret 1030 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 1031 const lzma_mt *options) 1032 { 1033 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 1034 1035 // Get the filter chain. 1036 lzma_options_easy easy; 1037 const lzma_filter *filters; 1038 uint64_t block_size; 1039 uint64_t outbuf_size_max; 1040 return_if_error(get_options(options, &easy, &filters, 1041 &block_size, &outbuf_size_max)); 1042 1043 #if SIZE_MAX < UINT64_MAX 1044 if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX) 1045 return LZMA_MEM_ERROR; 1046 #endif 1047 1048 // Validate the filter chain so that we can give an error in this 1049 // function instead of delaying it to the first call to lzma_code(). 1050 // The memory usage calculation verifies the filter chain as 1051 // a side effect so we take advantage of that. It's not a perfect 1052 // check though as raw encoder allows LZMA1 too but such problems 1053 // will be caught eventually with Block Header encoder. 1054 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 1055 return LZMA_OPTIONS_ERROR; 1056 1057 // Validate the Check ID. 1058 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 1059 return LZMA_PROG_ERROR; 1060 1061 if (!lzma_check_is_supported(options->check)) 1062 return LZMA_UNSUPPORTED_CHECK; 1063 1064 // Allocate and initialize the base structure if needed. 1065 lzma_stream_coder *coder = next->coder; 1066 if (coder == NULL) { 1067 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator); 1068 if (coder == NULL) 1069 return LZMA_MEM_ERROR; 1070 1071 next->coder = coder; 1072 1073 // For the mutex and condition variable initializations 1074 // the error handling has to be done here because 1075 // stream_encoder_mt_end() doesn't know if they have 1076 // already been initialized or not. 1077 if (mythread_mutex_init(&coder->mutex)) { 1078 lzma_free(coder, allocator); 1079 next->coder = NULL; 1080 return LZMA_MEM_ERROR; 1081 } 1082 1083 if (mythread_cond_init(&coder->cond)) { 1084 mythread_mutex_destroy(&coder->mutex); 1085 lzma_free(coder, allocator); 1086 next->coder = NULL; 1087 return LZMA_MEM_ERROR; 1088 } 1089 1090 next->code = &stream_encode_mt; 1091 next->end = &stream_encoder_mt_end; 1092 next->get_progress = &get_progress; 1093 next->update = &stream_encoder_mt_update; 1094 1095 coder->filters[0].id = LZMA_VLI_UNKNOWN; 1096 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; 1097 coder->index_encoder = LZMA_NEXT_CODER_INIT; 1098 coder->index = NULL; 1099 memzero(&coder->outq, sizeof(coder->outq)); 1100 coder->threads = NULL; 1101 coder->threads_max = 0; 1102 coder->threads_initialized = 0; 1103 } 1104 1105 // Basic initializations 1106 coder->sequence = SEQ_STREAM_HEADER; 1107 coder->block_size = (size_t)(block_size); 1108 coder->outbuf_alloc_size = (size_t)(outbuf_size_max); 1109 coder->thread_error = LZMA_OK; 1110 coder->thr = NULL; 1111 1112 // Allocate the thread-specific base structures. 1113 assert(options->threads > 0); 1114 if (coder->threads_max != options->threads) { 1115 threads_end(coder, allocator); 1116 1117 coder->threads = NULL; 1118 coder->threads_max = 0; 1119 1120 coder->threads_initialized = 0; 1121 coder->threads_free = NULL; 1122 1123 coder->threads = lzma_alloc( 1124 options->threads * sizeof(worker_thread), 1125 allocator); 1126 if (coder->threads == NULL) 1127 return LZMA_MEM_ERROR; 1128 1129 coder->threads_max = options->threads; 1130 } else { 1131 // Reuse the old structures and threads. Tell the running 1132 // threads to stop and wait until they have stopped. 1133 threads_stop(coder, true); 1134 } 1135 1136 // Output queue 1137 return_if_error(lzma_outq_init(&coder->outq, allocator, 1138 options->threads)); 1139 1140 // Timeout 1141 coder->timeout = options->timeout; 1142 1143 // Free the old filter chain and the cache. 1144 lzma_filters_free(coder->filters, allocator); 1145 lzma_filters_free(coder->filters_cache, allocator); 1146 1147 // Copy the new filter chain. 1148 return_if_error(lzma_filters_copy( 1149 filters, coder->filters, allocator)); 1150 1151 // Index 1152 lzma_index_end(coder->index, allocator); 1153 coder->index = lzma_index_init(allocator); 1154 if (coder->index == NULL) 1155 return LZMA_MEM_ERROR; 1156 1157 // Stream Header 1158 coder->stream_flags.version = 0; 1159 coder->stream_flags.check = options->check; 1160 return_if_error(lzma_stream_header_encode( 1161 &coder->stream_flags, coder->header)); 1162 1163 coder->header_pos = 0; 1164 1165 // Progress info 1166 coder->progress_in = 0; 1167 coder->progress_out = LZMA_STREAM_HEADER_SIZE; 1168 1169 return LZMA_OK; 1170 } 1171 1172 1173 #ifdef HAVE_SYMBOL_VERSIONS_LINUX 1174 // These are for compatibility with binaries linked against liblzma that 1175 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7. 1176 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2 1177 // but it has been added here anyway since someone might misread the 1178 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist. 1179 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha", 1180 lzma_ret, lzma_stream_encoder_mt_512a)( 1181 lzma_stream *strm, const lzma_mt *options) 1182 lzma_nothrow lzma_attr_warn_unused_result 1183 __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 1184 1185 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2", 1186 lzma_ret, lzma_stream_encoder_mt_522)( 1187 lzma_stream *strm, const lzma_mt *options) 1188 lzma_nothrow lzma_attr_warn_unused_result 1189 __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 1190 1191 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2", 1192 lzma_ret, lzma_stream_encoder_mt_52)( 1193 lzma_stream *strm, const lzma_mt *options) 1194 lzma_nothrow lzma_attr_warn_unused_result; 1195 1196 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52 1197 #endif 1198 extern LZMA_API(lzma_ret) 1199 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 1200 { 1201 lzma_next_strm_init(stream_encoder_mt_init, strm, options); 1202 1203 strm->internal->supported_actions[LZMA_RUN] = true; 1204 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 1205 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 1206 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 1207 strm->internal->supported_actions[LZMA_FINISH] = true; 1208 1209 return LZMA_OK; 1210 } 1211 1212 1213 #ifdef HAVE_SYMBOL_VERSIONS_LINUX 1214 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha", 1215 uint64_t, lzma_stream_encoder_mt_memusage_512a)( 1216 const lzma_mt *options) lzma_nothrow lzma_attr_pure 1217 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 1218 1219 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2", 1220 uint64_t, lzma_stream_encoder_mt_memusage_522)( 1221 const lzma_mt *options) lzma_nothrow lzma_attr_pure 1222 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 1223 1224 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2", 1225 uint64_t, lzma_stream_encoder_mt_memusage_52)( 1226 const lzma_mt *options) lzma_nothrow lzma_attr_pure; 1227 1228 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52 1229 #endif 1230 // This function name is a monster but it's consistent with the older 1231 // monster names. :-( 31 chars is the max that C99 requires so in that 1232 // sense it's not too long. ;-) 1233 extern LZMA_API(uint64_t) 1234 lzma_stream_encoder_mt_memusage(const lzma_mt *options) 1235 { 1236 lzma_options_easy easy; 1237 const lzma_filter *filters; 1238 uint64_t block_size; 1239 uint64_t outbuf_size_max; 1240 1241 if (get_options(options, &easy, &filters, &block_size, 1242 &outbuf_size_max) != LZMA_OK) 1243 return UINT64_MAX; 1244 1245 // Memory usage of the input buffers 1246 const uint64_t inbuf_memusage = options->threads * block_size; 1247 1248 // Memory usage of the filter encoders 1249 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 1250 if (filters_memusage == UINT64_MAX) 1251 return UINT64_MAX; 1252 1253 filters_memusage *= options->threads; 1254 1255 // Memory usage of the output queue 1256 const uint64_t outq_memusage = lzma_outq_memusage( 1257 outbuf_size_max, options->threads); 1258 if (outq_memusage == UINT64_MAX) 1259 return UINT64_MAX; 1260 1261 // Sum them with overflow checking. 1262 uint64_t total_memusage = LZMA_MEMUSAGE_BASE 1263 + sizeof(lzma_stream_coder) 1264 + options->threads * sizeof(worker_thread); 1265 1266 if (UINT64_MAX - total_memusage < inbuf_memusage) 1267 return UINT64_MAX; 1268 1269 total_memusage += inbuf_memusage; 1270 1271 if (UINT64_MAX - total_memusage < filters_memusage) 1272 return UINT64_MAX; 1273 1274 total_memusage += filters_memusage; 1275 1276 if (UINT64_MAX - total_memusage < outq_memusage) 1277 return UINT64_MAX; 1278 1279 return total_memusage + outq_memusage; 1280 } 1281