1 /////////////////////////////////////////////////////////////////////////////// 2 // 3 /// \file stream_encoder_mt.c 4 /// \brief Multithreaded .xz Stream encoder 5 // 6 // Author: Lasse Collin 7 // 8 // This file has been put into the public domain. 9 // You can do whatever you want with this file. 10 // 11 /////////////////////////////////////////////////////////////////////////////// 12 13 #include "filter_encoder.h" 14 #include "easy_preset.h" 15 #include "block_encoder.h" 16 #include "block_buffer_encoder.h" 17 #include "index_encoder.h" 18 #include "outqueue.h" 19 20 21 /// Maximum supported block size. This makes it simpler to prevent integer 22 /// overflows if we are given unusually large block size. 23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 24 25 26 typedef enum { 27 /// Waiting for work. 28 THR_IDLE, 29 30 /// Encoding is in progress. 31 THR_RUN, 32 33 /// Encoding is in progress but no more input data will 34 /// be read. 35 THR_FINISH, 36 37 /// The main thread wants the thread to stop whatever it was doing 38 /// but not exit. 39 THR_STOP, 40 41 /// The main thread wants the thread to exit. We could use 42 /// cancellation but since there's stopped anyway, this is lazier. 43 THR_EXIT, 44 45 } worker_state; 46 47 typedef struct lzma_stream_coder_s lzma_stream_coder; 48 49 typedef struct worker_thread_s worker_thread; 50 struct worker_thread_s { 51 worker_state state; 52 53 /// Input buffer of coder->block_size bytes. The main thread will 54 /// put new input into this and update in_size accordingly. Once 55 /// no more input is coming, state will be set to THR_FINISH. 56 uint8_t *in; 57 58 /// Amount of data available in the input buffer. This is modified 59 /// only by the main thread. 60 size_t in_size; 61 62 /// Output buffer for this thread. This is set by the main 63 /// thread every time a new Block is started with this thread 64 /// structure. 65 lzma_outbuf *outbuf; 66 67 /// Pointer to the main structure is needed when putting this 68 /// thread back to the stack of free threads. 69 lzma_stream_coder *coder; 70 71 /// The allocator is set by the main thread. Since a copy of the 72 /// pointer is kept here, the application must not change the 73 /// allocator before calling lzma_end(). 74 const lzma_allocator *allocator; 75 76 /// Amount of uncompressed data that has already been compressed. 77 uint64_t progress_in; 78 79 /// Amount of compressed data that is ready. 80 uint64_t progress_out; 81 82 /// Block encoder 83 lzma_next_coder block_encoder; 84 85 /// Compression options for this Block 86 lzma_block block_options; 87 88 /// Filter chain for this thread. By copying the filters array 89 /// to each thread it is possible to change the filter chain 90 /// between Blocks using lzma_filters_update(). 91 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 92 93 /// Next structure in the stack of free worker threads. 94 worker_thread *next; 95 96 mythread_mutex mutex; 97 mythread_cond cond; 98 99 /// The ID of this thread is used to join the thread 100 /// when it's not needed anymore. 101 mythread thread_id; 102 }; 103 104 105 struct lzma_stream_coder_s { 106 enum { 107 SEQ_STREAM_HEADER, 108 SEQ_BLOCK, 109 SEQ_INDEX, 110 SEQ_STREAM_FOOTER, 111 } sequence; 112 113 /// Start a new Block every block_size bytes of input unless 114 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 115 size_t block_size; 116 117 /// The filter chain to use for the next Block. 118 /// This can be updated using lzma_filters_update() 119 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH. 120 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 121 122 /// A copy of filters[] will be put here when attempting to get 123 /// a new worker thread. This will be copied to a worker thread 124 /// when a thread becomes free and then this cache is marked as 125 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache 126 /// the filter options from filters[] would get uselessly copied 127 /// multiple times (allocated and freed) when waiting for a new free 128 /// worker thread. 129 /// 130 /// This is freed if filters[] is updated via lzma_filters_update(). 131 lzma_filter filters_cache[LZMA_FILTERS_MAX + 1]; 132 133 134 /// Index to hold sizes of the Blocks 135 lzma_index *index; 136 137 /// Index encoder 138 lzma_next_coder index_encoder; 139 140 141 /// Stream Flags for encoding the Stream Header and Stream Footer. 142 lzma_stream_flags stream_flags; 143 144 /// Buffer to hold Stream Header and Stream Footer. 145 uint8_t header[LZMA_STREAM_HEADER_SIZE]; 146 147 /// Read position in header[] 148 size_t header_pos; 149 150 151 /// Output buffer queue for compressed data 152 lzma_outq outq; 153 154 /// How much memory to allocate for each lzma_outbuf.buf 155 size_t outbuf_alloc_size; 156 157 158 /// Maximum wait time if cannot use all the input and cannot 159 /// fill the output buffer. This is in milliseconds. 160 uint32_t timeout; 161 162 163 /// Error code from a worker thread 164 lzma_ret thread_error; 165 166 /// Array of allocated thread-specific structures 167 worker_thread *threads; 168 169 /// Number of structures in "threads" above. This is also the 170 /// number of threads that will be created at maximum. 171 uint32_t threads_max; 172 173 /// Number of thread structures that have been initialized, and 174 /// thus the number of worker threads actually created so far. 175 uint32_t threads_initialized; 176 177 /// Stack of free threads. When a thread finishes, it puts itself 178 /// back into this stack. This starts as empty because threads 179 /// are created only when actually needed. 180 worker_thread *threads_free; 181 182 /// The most recent worker thread to which the main thread writes 183 /// the new input from the application. 184 worker_thread *thr; 185 186 187 /// Amount of uncompressed data in Blocks that have already 188 /// been finished. 189 uint64_t progress_in; 190 191 /// Amount of compressed data in Stream Header + Blocks that 192 /// have already been finished. 193 uint64_t progress_out; 194 195 196 mythread_mutex mutex; 197 mythread_cond cond; 198 }; 199 200 201 /// Tell the main thread that something has gone wrong. 202 static void 203 worker_error(worker_thread *thr, lzma_ret ret) 204 { 205 assert(ret != LZMA_OK); 206 assert(ret != LZMA_STREAM_END); 207 208 mythread_sync(thr->coder->mutex) { 209 if (thr->coder->thread_error == LZMA_OK) 210 thr->coder->thread_error = ret; 211 212 mythread_cond_signal(&thr->coder->cond); 213 } 214 215 return; 216 } 217 218 219 static worker_state 220 worker_encode(worker_thread *thr, size_t *out_pos, worker_state state) 221 { 222 assert(thr->progress_in == 0); 223 assert(thr->progress_out == 0); 224 225 // Set the Block options. 226 thr->block_options = (lzma_block){ 227 .version = 0, 228 .check = thr->coder->stream_flags.check, 229 .compressed_size = thr->outbuf->allocated, 230 .uncompressed_size = thr->coder->block_size, 231 .filters = thr->filters, 232 }; 233 234 // Calculate maximum size of the Block Header. This amount is 235 // reserved in the beginning of the buffer so that Block Header 236 // along with Compressed Size and Uncompressed Size can be 237 // written there. 238 lzma_ret ret = lzma_block_header_size(&thr->block_options); 239 if (ret != LZMA_OK) { 240 worker_error(thr, ret); 241 return THR_STOP; 242 } 243 244 // Initialize the Block encoder. 245 ret = lzma_block_encoder_init(&thr->block_encoder, 246 thr->allocator, &thr->block_options); 247 if (ret != LZMA_OK) { 248 worker_error(thr, ret); 249 return THR_STOP; 250 } 251 252 size_t in_pos = 0; 253 size_t in_size = 0; 254 255 *out_pos = thr->block_options.header_size; 256 const size_t out_size = thr->outbuf->allocated; 257 258 do { 259 mythread_sync(thr->mutex) { 260 // Store in_pos and *out_pos into *thr so that 261 // an application may read them via 262 // lzma_get_progress() to get progress information. 263 // 264 // NOTE: These aren't updated when the encoding 265 // finishes. Instead, the final values are taken 266 // later from thr->outbuf. 267 thr->progress_in = in_pos; 268 thr->progress_out = *out_pos; 269 270 while (in_size == thr->in_size 271 && thr->state == THR_RUN) 272 mythread_cond_wait(&thr->cond, &thr->mutex); 273 274 state = thr->state; 275 in_size = thr->in_size; 276 } 277 278 // Return if we were asked to stop or exit. 279 if (state >= THR_STOP) 280 return state; 281 282 lzma_action action = state == THR_FINISH 283 ? LZMA_FINISH : LZMA_RUN; 284 285 // Limit the amount of input given to the Block encoder 286 // at once. This way this thread can react fairly quickly 287 // if the main thread wants us to stop or exit. 288 static const size_t in_chunk_max = 16384; 289 size_t in_limit = in_size; 290 if (in_size - in_pos > in_chunk_max) { 291 in_limit = in_pos + in_chunk_max; 292 action = LZMA_RUN; 293 } 294 295 ret = thr->block_encoder.code( 296 thr->block_encoder.coder, thr->allocator, 297 thr->in, &in_pos, in_limit, thr->outbuf->buf, 298 out_pos, out_size, action); 299 } while (ret == LZMA_OK && *out_pos < out_size); 300 301 switch (ret) { 302 case LZMA_STREAM_END: 303 assert(state == THR_FINISH); 304 305 // Encode the Block Header. By doing it after 306 // the compression, we can store the Compressed Size 307 // and Uncompressed Size fields. 308 ret = lzma_block_header_encode(&thr->block_options, 309 thr->outbuf->buf); 310 if (ret != LZMA_OK) { 311 worker_error(thr, ret); 312 return THR_STOP; 313 } 314 315 break; 316 317 case LZMA_OK: 318 // The data was incompressible. Encode it using uncompressed 319 // LZMA2 chunks. 320 // 321 // First wait that we have gotten all the input. 322 mythread_sync(thr->mutex) { 323 while (thr->state == THR_RUN) 324 mythread_cond_wait(&thr->cond, &thr->mutex); 325 326 state = thr->state; 327 in_size = thr->in_size; 328 } 329 330 if (state >= THR_STOP) 331 return state; 332 333 // Do the encoding. This takes care of the Block Header too. 334 *out_pos = 0; 335 ret = lzma_block_uncomp_encode(&thr->block_options, 336 thr->in, in_size, thr->outbuf->buf, 337 out_pos, out_size); 338 339 // It shouldn't fail. 340 if (ret != LZMA_OK) { 341 worker_error(thr, LZMA_PROG_ERROR); 342 return THR_STOP; 343 } 344 345 break; 346 347 default: 348 worker_error(thr, ret); 349 return THR_STOP; 350 } 351 352 // Set the size information that will be read by the main thread 353 // to write the Index field. 354 thr->outbuf->unpadded_size 355 = lzma_block_unpadded_size(&thr->block_options); 356 assert(thr->outbuf->unpadded_size != 0); 357 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 358 359 return THR_FINISH; 360 } 361 362 363 static MYTHREAD_RET_TYPE 364 worker_start(void *thr_ptr) 365 { 366 worker_thread *thr = thr_ptr; 367 worker_state state = THR_IDLE; // Init to silence a warning 368 369 while (true) { 370 // Wait for work. 371 mythread_sync(thr->mutex) { 372 while (true) { 373 // The thread is already idle so if we are 374 // requested to stop, just set the state. 375 if (thr->state == THR_STOP) { 376 thr->state = THR_IDLE; 377 mythread_cond_signal(&thr->cond); 378 } 379 380 state = thr->state; 381 if (state != THR_IDLE) 382 break; 383 384 mythread_cond_wait(&thr->cond, &thr->mutex); 385 } 386 } 387 388 size_t out_pos = 0; 389 390 assert(state != THR_IDLE); 391 assert(state != THR_STOP); 392 393 if (state <= THR_FINISH) 394 state = worker_encode(thr, &out_pos, state); 395 396 if (state == THR_EXIT) 397 break; 398 399 // Mark the thread as idle unless the main thread has 400 // told us to exit. Signal is needed for the case 401 // where the main thread is waiting for the threads to stop. 402 mythread_sync(thr->mutex) { 403 if (thr->state != THR_EXIT) { 404 thr->state = THR_IDLE; 405 mythread_cond_signal(&thr->cond); 406 } 407 } 408 409 mythread_sync(thr->coder->mutex) { 410 // If no errors occurred, make the encoded data 411 // available to be copied out. 412 if (state == THR_FINISH) { 413 thr->outbuf->pos = out_pos; 414 thr->outbuf->finished = true; 415 } 416 417 // Update the main progress info. 418 thr->coder->progress_in 419 += thr->outbuf->uncompressed_size; 420 thr->coder->progress_out += out_pos; 421 thr->progress_in = 0; 422 thr->progress_out = 0; 423 424 // Return this thread to the stack of free threads. 425 thr->next = thr->coder->threads_free; 426 thr->coder->threads_free = thr; 427 428 mythread_cond_signal(&thr->coder->cond); 429 } 430 } 431 432 // Exiting, free the resources. 433 lzma_filters_free(thr->filters, thr->allocator); 434 435 mythread_mutex_destroy(&thr->mutex); 436 mythread_cond_destroy(&thr->cond); 437 438 lzma_next_end(&thr->block_encoder, thr->allocator); 439 lzma_free(thr->in, thr->allocator); 440 return MYTHREAD_RET_VALUE; 441 } 442 443 444 /// Make the threads stop but not exit. Optionally wait for them to stop. 445 static void 446 threads_stop(lzma_stream_coder *coder, bool wait_for_threads) 447 { 448 // Tell the threads to stop. 449 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 450 mythread_sync(coder->threads[i].mutex) { 451 coder->threads[i].state = THR_STOP; 452 mythread_cond_signal(&coder->threads[i].cond); 453 } 454 } 455 456 if (!wait_for_threads) 457 return; 458 459 // Wait for the threads to settle in the idle state. 460 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 461 mythread_sync(coder->threads[i].mutex) { 462 while (coder->threads[i].state != THR_IDLE) 463 mythread_cond_wait(&coder->threads[i].cond, 464 &coder->threads[i].mutex); 465 } 466 } 467 468 return; 469 } 470 471 472 /// Stop the threads and free the resources associated with them. 473 /// Wait until the threads have exited. 474 static void 475 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator) 476 { 477 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 478 mythread_sync(coder->threads[i].mutex) { 479 coder->threads[i].state = THR_EXIT; 480 mythread_cond_signal(&coder->threads[i].cond); 481 } 482 } 483 484 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 485 int ret = mythread_join(coder->threads[i].thread_id); 486 assert(ret == 0); 487 (void)ret; 488 } 489 490 lzma_free(coder->threads, allocator); 491 return; 492 } 493 494 495 /// Initialize a new worker_thread structure and create a new thread. 496 static lzma_ret 497 initialize_new_thread(lzma_stream_coder *coder, 498 const lzma_allocator *allocator) 499 { 500 worker_thread *thr = &coder->threads[coder->threads_initialized]; 501 502 thr->in = lzma_alloc(coder->block_size, allocator); 503 if (thr->in == NULL) 504 return LZMA_MEM_ERROR; 505 506 if (mythread_mutex_init(&thr->mutex)) 507 goto error_mutex; 508 509 if (mythread_cond_init(&thr->cond)) 510 goto error_cond; 511 512 thr->state = THR_IDLE; 513 thr->allocator = allocator; 514 thr->coder = coder; 515 thr->progress_in = 0; 516 thr->progress_out = 0; 517 thr->block_encoder = LZMA_NEXT_CODER_INIT; 518 thr->filters[0].id = LZMA_VLI_UNKNOWN; 519 520 if (mythread_create(&thr->thread_id, &worker_start, thr)) 521 goto error_thread; 522 523 ++coder->threads_initialized; 524 coder->thr = thr; 525 526 return LZMA_OK; 527 528 error_thread: 529 mythread_cond_destroy(&thr->cond); 530 531 error_cond: 532 mythread_mutex_destroy(&thr->mutex); 533 534 error_mutex: 535 lzma_free(thr->in, allocator); 536 return LZMA_MEM_ERROR; 537 } 538 539 540 static lzma_ret 541 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) 542 { 543 // If there are no free output subqueues, there is no 544 // point to try getting a thread. 545 if (!lzma_outq_has_buf(&coder->outq)) 546 return LZMA_OK; 547 548 // That's also true if we cannot allocate memory for the output 549 // buffer in the output queue. 550 return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, 551 coder->outbuf_alloc_size)); 552 553 // Make a thread-specific copy of the filter chain. Put it in 554 // the cache array first so that if we cannot get a new thread yet, 555 // the allocation is ready when we try again. 556 if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN) 557 return_if_error(lzma_filters_copy( 558 coder->filters, coder->filters_cache, allocator)); 559 560 // If there is a free structure on the stack, use it. 561 mythread_sync(coder->mutex) { 562 if (coder->threads_free != NULL) { 563 coder->thr = coder->threads_free; 564 coder->threads_free = coder->threads_free->next; 565 } 566 } 567 568 if (coder->thr == NULL) { 569 // If there are no uninitialized structures left, return. 570 if (coder->threads_initialized == coder->threads_max) 571 return LZMA_OK; 572 573 // Initialize a new thread. 574 return_if_error(initialize_new_thread(coder, allocator)); 575 } 576 577 // Reset the parts of the thread state that have to be done 578 // in the main thread. 579 mythread_sync(coder->thr->mutex) { 580 coder->thr->state = THR_RUN; 581 coder->thr->in_size = 0; 582 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); 583 584 // Free the old thread-specific filter options and replace 585 // them with the already-allocated new options from 586 // coder->filters_cache[]. Then mark the cache as empty. 587 lzma_filters_free(coder->thr->filters, allocator); 588 memcpy(coder->thr->filters, coder->filters_cache, 589 sizeof(coder->filters_cache)); 590 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; 591 592 mythread_cond_signal(&coder->thr->cond); 593 } 594 595 return LZMA_OK; 596 } 597 598 599 static lzma_ret 600 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, 601 const uint8_t *restrict in, size_t *restrict in_pos, 602 size_t in_size, lzma_action action) 603 { 604 while (*in_pos < in_size 605 || (coder->thr != NULL && action != LZMA_RUN)) { 606 if (coder->thr == NULL) { 607 // Get a new thread. 608 const lzma_ret ret = get_thread(coder, allocator); 609 if (coder->thr == NULL) 610 return ret; 611 } 612 613 // Copy the input data to thread's buffer. 614 size_t thr_in_size = coder->thr->in_size; 615 lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 616 &thr_in_size, coder->block_size); 617 618 // Tell the Block encoder to finish if 619 // - it has got block_size bytes of input; or 620 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 621 // or LZMA_FULL_BARRIER was used. 622 // 623 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 624 const bool finish = thr_in_size == coder->block_size 625 || (*in_pos == in_size && action != LZMA_RUN); 626 627 bool block_error = false; 628 629 mythread_sync(coder->thr->mutex) { 630 if (coder->thr->state == THR_IDLE) { 631 // Something has gone wrong with the Block 632 // encoder. It has set coder->thread_error 633 // which we will read a few lines later. 634 block_error = true; 635 } else { 636 // Tell the Block encoder its new amount 637 // of input and update the state if needed. 638 coder->thr->in_size = thr_in_size; 639 640 if (finish) 641 coder->thr->state = THR_FINISH; 642 643 mythread_cond_signal(&coder->thr->cond); 644 } 645 } 646 647 if (block_error) { 648 lzma_ret ret; 649 650 mythread_sync(coder->mutex) { 651 ret = coder->thread_error; 652 } 653 654 return ret; 655 } 656 657 if (finish) 658 coder->thr = NULL; 659 } 660 661 return LZMA_OK; 662 } 663 664 665 /// Wait until more input can be consumed, more output can be read, or 666 /// an optional timeout is reached. 667 static bool 668 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, 669 bool *has_blocked, bool has_input) 670 { 671 if (coder->timeout != 0 && !*has_blocked) { 672 // Every time when stream_encode_mt() is called via 673 // lzma_code(), *has_blocked starts as false. We set it 674 // to true here and calculate the absolute time when 675 // we must return if there's nothing to do. 676 // 677 // This way if we block multiple times for short moments 678 // less than "timeout" milliseconds, we will return once 679 // "timeout" amount of time has passed since the *first* 680 // blocking occurred. If the absolute time was calculated 681 // again every time we block, "timeout" would effectively 682 // be meaningless if we never consecutively block longer 683 // than "timeout" ms. 684 *has_blocked = true; 685 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 686 } 687 688 bool timed_out = false; 689 690 mythread_sync(coder->mutex) { 691 // There are four things that we wait. If one of them 692 // becomes possible, we return. 693 // - If there is input left, we need to get a free 694 // worker thread and an output buffer for it. 695 // - Data ready to be read from the output queue. 696 // - A worker thread indicates an error. 697 // - Time out occurs. 698 while ((!has_input || coder->threads_free == NULL 699 || !lzma_outq_has_buf(&coder->outq)) 700 && !lzma_outq_is_readable(&coder->outq) 701 && coder->thread_error == LZMA_OK 702 && !timed_out) { 703 if (coder->timeout != 0) 704 timed_out = mythread_cond_timedwait( 705 &coder->cond, &coder->mutex, 706 wait_abs) != 0; 707 else 708 mythread_cond_wait(&coder->cond, 709 &coder->mutex); 710 } 711 } 712 713 return timed_out; 714 } 715 716 717 static lzma_ret 718 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, 719 const uint8_t *restrict in, size_t *restrict in_pos, 720 size_t in_size, uint8_t *restrict out, 721 size_t *restrict out_pos, size_t out_size, lzma_action action) 722 { 723 lzma_stream_coder *coder = coder_ptr; 724 725 switch (coder->sequence) { 726 case SEQ_STREAM_HEADER: 727 lzma_bufcpy(coder->header, &coder->header_pos, 728 sizeof(coder->header), 729 out, out_pos, out_size); 730 if (coder->header_pos < sizeof(coder->header)) 731 return LZMA_OK; 732 733 coder->header_pos = 0; 734 coder->sequence = SEQ_BLOCK; 735 736 // Fall through 737 738 case SEQ_BLOCK: { 739 // Initialized to silence warnings. 740 lzma_vli unpadded_size = 0; 741 lzma_vli uncompressed_size = 0; 742 lzma_ret ret = LZMA_OK; 743 744 // These are for wait_for_work(). 745 bool has_blocked = false; 746 mythread_condtime wait_abs; 747 748 while (true) { 749 mythread_sync(coder->mutex) { 750 // Check for Block encoder errors. 751 ret = coder->thread_error; 752 if (ret != LZMA_OK) { 753 assert(ret != LZMA_STREAM_END); 754 break; // Break out of mythread_sync. 755 } 756 757 // Try to read compressed data to out[]. 758 ret = lzma_outq_read(&coder->outq, allocator, 759 out, out_pos, out_size, 760 &unpadded_size, 761 &uncompressed_size); 762 } 763 764 if (ret == LZMA_STREAM_END) { 765 // End of Block. Add it to the Index. 766 ret = lzma_index_append(coder->index, 767 allocator, unpadded_size, 768 uncompressed_size); 769 if (ret != LZMA_OK) { 770 threads_stop(coder, false); 771 return ret; 772 } 773 774 // If we didn't fill the output buffer yet, 775 // try to read more data. Maybe the next 776 // outbuf has been finished already too. 777 if (*out_pos < out_size) 778 continue; 779 } 780 781 if (ret != LZMA_OK) { 782 // coder->thread_error was set. 783 threads_stop(coder, false); 784 return ret; 785 } 786 787 // Try to give uncompressed data to a worker thread. 788 ret = stream_encode_in(coder, allocator, 789 in, in_pos, in_size, action); 790 if (ret != LZMA_OK) { 791 threads_stop(coder, false); 792 return ret; 793 } 794 795 // See if we should wait or return. 796 // 797 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 798 if (*in_pos == in_size) { 799 // LZMA_RUN: More data is probably coming 800 // so return to let the caller fill the 801 // input buffer. 802 if (action == LZMA_RUN) 803 return LZMA_OK; 804 805 // LZMA_FULL_BARRIER: The same as with 806 // LZMA_RUN but tell the caller that the 807 // barrier was completed. 808 if (action == LZMA_FULL_BARRIER) 809 return LZMA_STREAM_END; 810 811 // Finishing or flushing isn't completed until 812 // all input data has been encoded and copied 813 // to the output buffer. 814 if (lzma_outq_is_empty(&coder->outq)) { 815 // LZMA_FINISH: Continue to encode 816 // the Index field. 817 if (action == LZMA_FINISH) 818 break; 819 820 // LZMA_FULL_FLUSH: Return to tell 821 // the caller that flushing was 822 // completed. 823 if (action == LZMA_FULL_FLUSH) 824 return LZMA_STREAM_END; 825 } 826 } 827 828 // Return if there is no output space left. 829 // This check must be done after testing the input 830 // buffer, because we might want to use a different 831 // return code. 832 if (*out_pos == out_size) 833 return LZMA_OK; 834 835 // Neither in nor out has been used completely. 836 // Wait until there's something we can do. 837 if (wait_for_work(coder, &wait_abs, &has_blocked, 838 *in_pos < in_size)) 839 return LZMA_TIMED_OUT; 840 } 841 842 // All Blocks have been encoded and the threads have stopped. 843 // Prepare to encode the Index field. 844 return_if_error(lzma_index_encoder_init( 845 &coder->index_encoder, allocator, 846 coder->index)); 847 coder->sequence = SEQ_INDEX; 848 849 // Update the progress info to take the Index and 850 // Stream Footer into account. Those are very fast to encode 851 // so in terms of progress information they can be thought 852 // to be ready to be copied out. 853 coder->progress_out += lzma_index_size(coder->index) 854 + LZMA_STREAM_HEADER_SIZE; 855 } 856 857 // Fall through 858 859 case SEQ_INDEX: { 860 // Call the Index encoder. It doesn't take any input, so 861 // those pointers can be NULL. 862 const lzma_ret ret = coder->index_encoder.code( 863 coder->index_encoder.coder, allocator, 864 NULL, NULL, 0, 865 out, out_pos, out_size, LZMA_RUN); 866 if (ret != LZMA_STREAM_END) 867 return ret; 868 869 // Encode the Stream Footer into coder->buffer. 870 coder->stream_flags.backward_size 871 = lzma_index_size(coder->index); 872 if (lzma_stream_footer_encode(&coder->stream_flags, 873 coder->header) != LZMA_OK) 874 return LZMA_PROG_ERROR; 875 876 coder->sequence = SEQ_STREAM_FOOTER; 877 } 878 879 // Fall through 880 881 case SEQ_STREAM_FOOTER: 882 lzma_bufcpy(coder->header, &coder->header_pos, 883 sizeof(coder->header), 884 out, out_pos, out_size); 885 return coder->header_pos < sizeof(coder->header) 886 ? LZMA_OK : LZMA_STREAM_END; 887 } 888 889 assert(0); 890 return LZMA_PROG_ERROR; 891 } 892 893 894 static void 895 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) 896 { 897 lzma_stream_coder *coder = coder_ptr; 898 899 // Threads must be killed before the output queue can be freed. 900 threads_end(coder, allocator); 901 lzma_outq_end(&coder->outq, allocator); 902 903 lzma_filters_free(coder->filters, allocator); 904 lzma_filters_free(coder->filters_cache, allocator); 905 906 lzma_next_end(&coder->index_encoder, allocator); 907 lzma_index_end(coder->index, allocator); 908 909 mythread_cond_destroy(&coder->cond); 910 mythread_mutex_destroy(&coder->mutex); 911 912 lzma_free(coder, allocator); 913 return; 914 } 915 916 917 static lzma_ret 918 stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator, 919 const lzma_filter *filters, 920 const lzma_filter *reversed_filters 921 lzma_attribute((__unused__))) 922 { 923 lzma_stream_coder *coder = coder_ptr; 924 925 // Applications shouldn't attempt to change the options when 926 // we are already encoding the Index or Stream Footer. 927 if (coder->sequence > SEQ_BLOCK) 928 return LZMA_PROG_ERROR; 929 930 // For now the threaded encoder doesn't support changing 931 // the options in the middle of a Block. 932 if (coder->thr != NULL) 933 return LZMA_PROG_ERROR; 934 935 // Check if the filter chain seems mostly valid. See the comment 936 // in stream_encoder_mt_init(). 937 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 938 return LZMA_OPTIONS_ERROR; 939 940 // Make a copy to a temporary buffer first. This way the encoder 941 // state stays unchanged if an error occurs in lzma_filters_copy(). 942 lzma_filter temp[LZMA_FILTERS_MAX + 1]; 943 return_if_error(lzma_filters_copy(filters, temp, allocator)); 944 945 // Free the options of the old chain as well as the cache. 946 lzma_filters_free(coder->filters, allocator); 947 lzma_filters_free(coder->filters_cache, allocator); 948 949 // Copy the new filter chain in place. 950 memcpy(coder->filters, temp, sizeof(temp)); 951 952 return LZMA_OK; 953 } 954 955 956 /// Options handling for lzma_stream_encoder_mt_init() and 957 /// lzma_stream_encoder_mt_memusage() 958 static lzma_ret 959 get_options(const lzma_mt *options, lzma_options_easy *opt_easy, 960 const lzma_filter **filters, uint64_t *block_size, 961 uint64_t *outbuf_size_max) 962 { 963 // Validate some of the options. 964 if (options == NULL) 965 return LZMA_PROG_ERROR; 966 967 if (options->flags != 0 || options->threads == 0 968 || options->threads > LZMA_THREADS_MAX) 969 return LZMA_OPTIONS_ERROR; 970 971 if (options->filters != NULL) { 972 // Filter chain was given, use it as is. 973 *filters = options->filters; 974 } else { 975 // Use a preset. 976 if (lzma_easy_preset(opt_easy, options->preset)) 977 return LZMA_OPTIONS_ERROR; 978 979 *filters = opt_easy->filters; 980 } 981 982 // Block size 983 if (options->block_size > 0) { 984 if (options->block_size > BLOCK_SIZE_MAX) 985 return LZMA_OPTIONS_ERROR; 986 987 *block_size = options->block_size; 988 } else { 989 // Determine the Block size from the filter chain. 990 *block_size = lzma_mt_block_size(*filters); 991 if (*block_size == 0) 992 return LZMA_OPTIONS_ERROR; 993 994 assert(*block_size <= BLOCK_SIZE_MAX); 995 } 996 997 // Calculate the maximum amount output that a single output buffer 998 // may need to hold. This is the same as the maximum total size of 999 // a Block. 1000 *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 1001 if (*outbuf_size_max == 0) 1002 return LZMA_MEM_ERROR; 1003 1004 return LZMA_OK; 1005 } 1006 1007 1008 static void 1009 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out) 1010 { 1011 lzma_stream_coder *coder = coder_ptr; 1012 1013 // Lock coder->mutex to prevent finishing threads from moving their 1014 // progress info from the worker_thread structure to lzma_stream_coder. 1015 mythread_sync(coder->mutex) { 1016 *progress_in = coder->progress_in; 1017 *progress_out = coder->progress_out; 1018 1019 for (size_t i = 0; i < coder->threads_initialized; ++i) { 1020 mythread_sync(coder->threads[i].mutex) { 1021 *progress_in += coder->threads[i].progress_in; 1022 *progress_out += coder->threads[i] 1023 .progress_out; 1024 } 1025 } 1026 } 1027 1028 return; 1029 } 1030 1031 1032 static lzma_ret 1033 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 1034 const lzma_mt *options) 1035 { 1036 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 1037 1038 // Get the filter chain. 1039 lzma_options_easy easy; 1040 const lzma_filter *filters; 1041 uint64_t block_size; 1042 uint64_t outbuf_size_max; 1043 return_if_error(get_options(options, &easy, &filters, 1044 &block_size, &outbuf_size_max)); 1045 1046 #if SIZE_MAX < UINT64_MAX 1047 if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX) 1048 return LZMA_MEM_ERROR; 1049 #endif 1050 1051 // Validate the filter chain so that we can give an error in this 1052 // function instead of delaying it to the first call to lzma_code(). 1053 // The memory usage calculation verifies the filter chain as 1054 // a side effect so we take advantage of that. It's not a perfect 1055 // check though as raw encoder allows LZMA1 too but such problems 1056 // will be caught eventually with Block Header encoder. 1057 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 1058 return LZMA_OPTIONS_ERROR; 1059 1060 // Validate the Check ID. 1061 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 1062 return LZMA_PROG_ERROR; 1063 1064 if (!lzma_check_is_supported(options->check)) 1065 return LZMA_UNSUPPORTED_CHECK; 1066 1067 // Allocate and initialize the base structure if needed. 1068 lzma_stream_coder *coder = next->coder; 1069 if (coder == NULL) { 1070 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator); 1071 if (coder == NULL) 1072 return LZMA_MEM_ERROR; 1073 1074 next->coder = coder; 1075 1076 // For the mutex and condition variable initializations 1077 // the error handling has to be done here because 1078 // stream_encoder_mt_end() doesn't know if they have 1079 // already been initialized or not. 1080 if (mythread_mutex_init(&coder->mutex)) { 1081 lzma_free(coder, allocator); 1082 next->coder = NULL; 1083 return LZMA_MEM_ERROR; 1084 } 1085 1086 if (mythread_cond_init(&coder->cond)) { 1087 mythread_mutex_destroy(&coder->mutex); 1088 lzma_free(coder, allocator); 1089 next->coder = NULL; 1090 return LZMA_MEM_ERROR; 1091 } 1092 1093 next->code = &stream_encode_mt; 1094 next->end = &stream_encoder_mt_end; 1095 next->get_progress = &get_progress; 1096 next->update = &stream_encoder_mt_update; 1097 1098 coder->filters[0].id = LZMA_VLI_UNKNOWN; 1099 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; 1100 coder->index_encoder = LZMA_NEXT_CODER_INIT; 1101 coder->index = NULL; 1102 memzero(&coder->outq, sizeof(coder->outq)); 1103 coder->threads = NULL; 1104 coder->threads_max = 0; 1105 coder->threads_initialized = 0; 1106 } 1107 1108 // Basic initializations 1109 coder->sequence = SEQ_STREAM_HEADER; 1110 coder->block_size = (size_t)(block_size); 1111 coder->outbuf_alloc_size = (size_t)(outbuf_size_max); 1112 coder->thread_error = LZMA_OK; 1113 coder->thr = NULL; 1114 1115 // Allocate the thread-specific base structures. 1116 assert(options->threads > 0); 1117 if (coder->threads_max != options->threads) { 1118 threads_end(coder, allocator); 1119 1120 coder->threads = NULL; 1121 coder->threads_max = 0; 1122 1123 coder->threads_initialized = 0; 1124 coder->threads_free = NULL; 1125 1126 coder->threads = lzma_alloc( 1127 options->threads * sizeof(worker_thread), 1128 allocator); 1129 if (coder->threads == NULL) 1130 return LZMA_MEM_ERROR; 1131 1132 coder->threads_max = options->threads; 1133 } else { 1134 // Reuse the old structures and threads. Tell the running 1135 // threads to stop and wait until they have stopped. 1136 threads_stop(coder, true); 1137 } 1138 1139 // Output queue 1140 return_if_error(lzma_outq_init(&coder->outq, allocator, 1141 options->threads)); 1142 1143 // Timeout 1144 coder->timeout = options->timeout; 1145 1146 // Free the old filter chain and the cache. 1147 lzma_filters_free(coder->filters, allocator); 1148 lzma_filters_free(coder->filters_cache, allocator); 1149 1150 // Copy the new filter chain. 1151 return_if_error(lzma_filters_copy( 1152 filters, coder->filters, allocator)); 1153 1154 // Index 1155 lzma_index_end(coder->index, allocator); 1156 coder->index = lzma_index_init(allocator); 1157 if (coder->index == NULL) 1158 return LZMA_MEM_ERROR; 1159 1160 // Stream Header 1161 coder->stream_flags.version = 0; 1162 coder->stream_flags.check = options->check; 1163 return_if_error(lzma_stream_header_encode( 1164 &coder->stream_flags, coder->header)); 1165 1166 coder->header_pos = 0; 1167 1168 // Progress info 1169 coder->progress_in = 0; 1170 coder->progress_out = LZMA_STREAM_HEADER_SIZE; 1171 1172 return LZMA_OK; 1173 } 1174 1175 1176 #ifdef HAVE_SYMBOL_VERSIONS_LINUX 1177 // These are for compatibility with binaries linked against liblzma that 1178 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7. 1179 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2 1180 // but it has been added here anyway since someone might misread the 1181 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist. 1182 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha", 1183 lzma_ret, lzma_stream_encoder_mt_512a)( 1184 lzma_stream *strm, const lzma_mt *options) 1185 lzma_nothrow lzma_attr_warn_unused_result 1186 __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 1187 1188 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2", 1189 lzma_ret, lzma_stream_encoder_mt_522)( 1190 lzma_stream *strm, const lzma_mt *options) 1191 lzma_nothrow lzma_attr_warn_unused_result 1192 __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 1193 1194 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2", 1195 lzma_ret, lzma_stream_encoder_mt_52)( 1196 lzma_stream *strm, const lzma_mt *options) 1197 lzma_nothrow lzma_attr_warn_unused_result; 1198 1199 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52 1200 #endif 1201 extern LZMA_API(lzma_ret) 1202 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 1203 { 1204 lzma_next_strm_init(stream_encoder_mt_init, strm, options); 1205 1206 strm->internal->supported_actions[LZMA_RUN] = true; 1207 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 1208 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 1209 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 1210 strm->internal->supported_actions[LZMA_FINISH] = true; 1211 1212 return LZMA_OK; 1213 } 1214 1215 1216 #ifdef HAVE_SYMBOL_VERSIONS_LINUX 1217 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha", 1218 uint64_t, lzma_stream_encoder_mt_memusage_512a)( 1219 const lzma_mt *options) lzma_nothrow lzma_attr_pure 1220 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 1221 1222 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2", 1223 uint64_t, lzma_stream_encoder_mt_memusage_522)( 1224 const lzma_mt *options) lzma_nothrow lzma_attr_pure 1225 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 1226 1227 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2", 1228 uint64_t, lzma_stream_encoder_mt_memusage_52)( 1229 const lzma_mt *options) lzma_nothrow lzma_attr_pure; 1230 1231 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52 1232 #endif 1233 // This function name is a monster but it's consistent with the older 1234 // monster names. :-( 31 chars is the max that C99 requires so in that 1235 // sense it's not too long. ;-) 1236 extern LZMA_API(uint64_t) 1237 lzma_stream_encoder_mt_memusage(const lzma_mt *options) 1238 { 1239 lzma_options_easy easy; 1240 const lzma_filter *filters; 1241 uint64_t block_size; 1242 uint64_t outbuf_size_max; 1243 1244 if (get_options(options, &easy, &filters, &block_size, 1245 &outbuf_size_max) != LZMA_OK) 1246 return UINT64_MAX; 1247 1248 // Memory usage of the input buffers 1249 const uint64_t inbuf_memusage = options->threads * block_size; 1250 1251 // Memory usage of the filter encoders 1252 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 1253 if (filters_memusage == UINT64_MAX) 1254 return UINT64_MAX; 1255 1256 filters_memusage *= options->threads; 1257 1258 // Memory usage of the output queue 1259 const uint64_t outq_memusage = lzma_outq_memusage( 1260 outbuf_size_max, options->threads); 1261 if (outq_memusage == UINT64_MAX) 1262 return UINT64_MAX; 1263 1264 // Sum them with overflow checking. 1265 uint64_t total_memusage = LZMA_MEMUSAGE_BASE 1266 + sizeof(lzma_stream_coder) 1267 + options->threads * sizeof(worker_thread); 1268 1269 if (UINT64_MAX - total_memusage < inbuf_memusage) 1270 return UINT64_MAX; 1271 1272 total_memusage += inbuf_memusage; 1273 1274 if (UINT64_MAX - total_memusage < filters_memusage) 1275 return UINT64_MAX; 1276 1277 total_memusage += filters_memusage; 1278 1279 if (UINT64_MAX - total_memusage < outq_memusage) 1280 return UINT64_MAX; 1281 1282 return total_memusage + outq_memusage; 1283 } 1284