1 /////////////////////////////////////////////////////////////////////////////// 2 // 3 /// \file stream_encoder_mt.c 4 /// \brief Multithreaded .xz Stream encoder 5 // 6 // Author: Lasse Collin 7 // 8 // This file has been put into the public domain. 9 // You can do whatever you want with this file. 10 // 11 /////////////////////////////////////////////////////////////////////////////// 12 13 #include "filter_encoder.h" 14 #include "easy_preset.h" 15 #include "block_encoder.h" 16 #include "block_buffer_encoder.h" 17 #include "index_encoder.h" 18 #include "outqueue.h" 19 20 21 /// Maximum supported block size. This makes it simpler to prevent integer 22 /// overflows if we are given unusually large block size. 23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 24 25 26 typedef enum { 27 /// Waiting for work. 28 THR_IDLE, 29 30 /// Encoding is in progress. 31 THR_RUN, 32 33 /// Encoding is in progress but no more input data will 34 /// be read. 35 THR_FINISH, 36 37 /// The main thread wants the thread to stop whatever it was doing 38 /// but not exit. 39 THR_STOP, 40 41 /// The main thread wants the thread to exit. We could use 42 /// cancellation but since there's stopped anyway, this is lazier. 43 THR_EXIT, 44 45 } worker_state; 46 47 typedef struct lzma_stream_coder_s lzma_stream_coder; 48 49 typedef struct worker_thread_s worker_thread; 50 struct worker_thread_s { 51 worker_state state; 52 53 /// Input buffer of coder->block_size bytes. The main thread will 54 /// put new input into this and update in_size accordingly. Once 55 /// no more input is coming, state will be set to THR_FINISH. 56 uint8_t *in; 57 58 /// Amount of data available in the input buffer. This is modified 59 /// only by the main thread. 60 size_t in_size; 61 62 /// Output buffer for this thread. This is set by the main 63 /// thread every time a new Block is started with this thread 64 /// structure. 65 lzma_outbuf *outbuf; 66 67 /// Pointer to the main structure is needed when putting this 68 /// thread back to the stack of free threads. 69 lzma_stream_coder *coder; 70 71 /// The allocator is set by the main thread. Since a copy of the 72 /// pointer is kept here, the application must not change the 73 /// allocator before calling lzma_end(). 74 const lzma_allocator *allocator; 75 76 /// Amount of uncompressed data that has already been compressed. 77 uint64_t progress_in; 78 79 /// Amount of compressed data that is ready. 80 uint64_t progress_out; 81 82 /// Block encoder 83 lzma_next_coder block_encoder; 84 85 /// Compression options for this Block 86 lzma_block block_options; 87 88 /// Filter chain for this thread. By copying the filters array 89 /// to each thread it is possible to change the filter chain 90 /// between Blocks using lzma_filters_update(). 91 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 92 93 /// Next structure in the stack of free worker threads. 94 worker_thread *next; 95 96 mythread_mutex mutex; 97 mythread_cond cond; 98 99 /// The ID of this thread is used to join the thread 100 /// when it's not needed anymore. 101 mythread thread_id; 102 }; 103 104 105 struct lzma_stream_coder_s { 106 enum { 107 SEQ_STREAM_HEADER, 108 SEQ_BLOCK, 109 SEQ_INDEX, 110 SEQ_STREAM_FOOTER, 111 } sequence; 112 113 /// Start a new Block every block_size bytes of input unless 114 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 115 size_t block_size; 116 117 /// The filter chain to use for the next Block. 118 /// This can be updated using lzma_filters_update() 119 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH. 120 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 121 122 /// A copy of filters[] will be put here when attempting to get 123 /// a new worker thread. This will be copied to a worker thread 124 /// when a thread becomes free and then this cache is marked as 125 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache 126 /// the filter options from filters[] would get uselessly copied 127 /// multiple times (allocated and freed) when waiting for a new free 128 /// worker thread. 129 /// 130 /// This is freed if filters[] is updated via lzma_filters_update(). 131 lzma_filter filters_cache[LZMA_FILTERS_MAX + 1]; 132 133 134 /// Index to hold sizes of the Blocks 135 lzma_index *index; 136 137 /// Index encoder 138 lzma_next_coder index_encoder; 139 140 141 /// Stream Flags for encoding the Stream Header and Stream Footer. 142 lzma_stream_flags stream_flags; 143 144 /// Buffer to hold Stream Header and Stream Footer. 145 uint8_t header[LZMA_STREAM_HEADER_SIZE]; 146 147 /// Read position in header[] 148 size_t header_pos; 149 150 151 /// Output buffer queue for compressed data 152 lzma_outq outq; 153 154 /// How much memory to allocate for each lzma_outbuf.buf 155 size_t outbuf_alloc_size; 156 157 158 /// Maximum wait time if cannot use all the input and cannot 159 /// fill the output buffer. This is in milliseconds. 160 uint32_t timeout; 161 162 163 /// Error code from a worker thread 164 lzma_ret thread_error; 165 166 /// Array of allocated thread-specific structures 167 worker_thread *threads; 168 169 /// Number of structures in "threads" above. This is also the 170 /// number of threads that will be created at maximum. 171 uint32_t threads_max; 172 173 /// Number of thread structures that have been initialized, and 174 /// thus the number of worker threads actually created so far. 175 uint32_t threads_initialized; 176 177 /// Stack of free threads. When a thread finishes, it puts itself 178 /// back into this stack. This starts as empty because threads 179 /// are created only when actually needed. 180 worker_thread *threads_free; 181 182 /// The most recent worker thread to which the main thread writes 183 /// the new input from the application. 184 worker_thread *thr; 185 186 187 /// Amount of uncompressed data in Blocks that have already 188 /// been finished. 189 uint64_t progress_in; 190 191 /// Amount of compressed data in Stream Header + Blocks that 192 /// have already been finished. 193 uint64_t progress_out; 194 195 196 mythread_mutex mutex; 197 mythread_cond cond; 198 }; 199 200 201 /// Tell the main thread that something has gone wrong. 202 static void 203 worker_error(worker_thread *thr, lzma_ret ret) 204 { 205 assert(ret != LZMA_OK); 206 assert(ret != LZMA_STREAM_END); 207 208 mythread_sync(thr->coder->mutex) { 209 if (thr->coder->thread_error == LZMA_OK) 210 thr->coder->thread_error = ret; 211 212 mythread_cond_signal(&thr->coder->cond); 213 } 214 215 return; 216 } 217 218 219 static worker_state 220 worker_encode(worker_thread *thr, size_t *out_pos, worker_state state) 221 { 222 assert(thr->progress_in == 0); 223 assert(thr->progress_out == 0); 224 225 // Set the Block options. 226 thr->block_options = (lzma_block){ 227 .version = 0, 228 .check = thr->coder->stream_flags.check, 229 .compressed_size = thr->outbuf->allocated, 230 .uncompressed_size = thr->coder->block_size, 231 .filters = thr->filters, 232 }; 233 234 // Calculate maximum size of the Block Header. This amount is 235 // reserved in the beginning of the buffer so that Block Header 236 // along with Compressed Size and Uncompressed Size can be 237 // written there. 238 lzma_ret ret = lzma_block_header_size(&thr->block_options); 239 if (ret != LZMA_OK) { 240 worker_error(thr, ret); 241 return THR_STOP; 242 } 243 244 // Initialize the Block encoder. 245 ret = lzma_block_encoder_init(&thr->block_encoder, 246 thr->allocator, &thr->block_options); 247 if (ret != LZMA_OK) { 248 worker_error(thr, ret); 249 return THR_STOP; 250 } 251 252 size_t in_pos = 0; 253 size_t in_size = 0; 254 255 *out_pos = thr->block_options.header_size; 256 const size_t out_size = thr->outbuf->allocated; 257 258 do { 259 mythread_sync(thr->mutex) { 260 // Store in_pos and *out_pos into *thr so that 261 // an application may read them via 262 // lzma_get_progress() to get progress information. 263 // 264 // NOTE: These aren't updated when the encoding 265 // finishes. Instead, the final values are taken 266 // later from thr->outbuf. 267 thr->progress_in = in_pos; 268 thr->progress_out = *out_pos; 269 270 while (in_size == thr->in_size 271 && thr->state == THR_RUN) 272 mythread_cond_wait(&thr->cond, &thr->mutex); 273 274 state = thr->state; 275 in_size = thr->in_size; 276 } 277 278 // Return if we were asked to stop or exit. 279 if (state >= THR_STOP) 280 return state; 281 282 lzma_action action = state == THR_FINISH 283 ? LZMA_FINISH : LZMA_RUN; 284 285 // Limit the amount of input given to the Block encoder 286 // at once. This way this thread can react fairly quickly 287 // if the main thread wants us to stop or exit. 288 static const size_t in_chunk_max = 16384; 289 size_t in_limit = in_size; 290 if (in_size - in_pos > in_chunk_max) { 291 in_limit = in_pos + in_chunk_max; 292 action = LZMA_RUN; 293 } 294 295 ret = thr->block_encoder.code( 296 thr->block_encoder.coder, thr->allocator, 297 thr->in, &in_pos, in_limit, thr->outbuf->buf, 298 out_pos, out_size, action); 299 } while (ret == LZMA_OK && *out_pos < out_size); 300 301 switch (ret) { 302 case LZMA_STREAM_END: 303 assert(state == THR_FINISH); 304 305 // Encode the Block Header. By doing it after 306 // the compression, we can store the Compressed Size 307 // and Uncompressed Size fields. 308 ret = lzma_block_header_encode(&thr->block_options, 309 thr->outbuf->buf); 310 if (ret != LZMA_OK) { 311 worker_error(thr, ret); 312 return THR_STOP; 313 } 314 315 break; 316 317 case LZMA_OK: 318 // The data was incompressible. Encode it using uncompressed 319 // LZMA2 chunks. 320 // 321 // First wait that we have gotten all the input. 322 mythread_sync(thr->mutex) { 323 while (thr->state == THR_RUN) 324 mythread_cond_wait(&thr->cond, &thr->mutex); 325 326 state = thr->state; 327 in_size = thr->in_size; 328 } 329 330 if (state >= THR_STOP) 331 return state; 332 333 // Do the encoding. This takes care of the Block Header too. 334 *out_pos = 0; 335 ret = lzma_block_uncomp_encode(&thr->block_options, 336 thr->in, in_size, thr->outbuf->buf, 337 out_pos, out_size); 338 339 // It shouldn't fail. 340 if (ret != LZMA_OK) { 341 worker_error(thr, LZMA_PROG_ERROR); 342 return THR_STOP; 343 } 344 345 break; 346 347 default: 348 worker_error(thr, ret); 349 return THR_STOP; 350 } 351 352 // Set the size information that will be read by the main thread 353 // to write the Index field. 354 thr->outbuf->unpadded_size 355 = lzma_block_unpadded_size(&thr->block_options); 356 assert(thr->outbuf->unpadded_size != 0); 357 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 358 359 return THR_FINISH; 360 } 361 362 363 static MYTHREAD_RET_TYPE 364 worker_start(void *thr_ptr) 365 { 366 worker_thread *thr = thr_ptr; 367 worker_state state = THR_IDLE; // Init to silence a warning 368 369 while (true) { 370 // Wait for work. 371 mythread_sync(thr->mutex) { 372 while (true) { 373 // The thread is already idle so if we are 374 // requested to stop, just set the state. 375 if (thr->state == THR_STOP) { 376 thr->state = THR_IDLE; 377 mythread_cond_signal(&thr->cond); 378 } 379 380 state = thr->state; 381 if (state != THR_IDLE) 382 break; 383 384 mythread_cond_wait(&thr->cond, &thr->mutex); 385 } 386 } 387 388 size_t out_pos = 0; 389 390 assert(state != THR_IDLE); 391 assert(state != THR_STOP); 392 393 if (state <= THR_FINISH) 394 state = worker_encode(thr, &out_pos, state); 395 396 if (state == THR_EXIT) 397 break; 398 399 // Mark the thread as idle unless the main thread has 400 // told us to exit. Signal is needed for the case 401 // where the main thread is waiting for the threads to stop. 402 mythread_sync(thr->mutex) { 403 if (thr->state != THR_EXIT) { 404 thr->state = THR_IDLE; 405 mythread_cond_signal(&thr->cond); 406 } 407 } 408 409 mythread_sync(thr->coder->mutex) { 410 // If no errors occurred, make the encoded data 411 // available to be copied out. 412 if (state == THR_FINISH) { 413 thr->outbuf->pos = out_pos; 414 thr->outbuf->finished = true; 415 } 416 417 // Update the main progress info. 418 thr->coder->progress_in 419 += thr->outbuf->uncompressed_size; 420 thr->coder->progress_out += out_pos; 421 thr->progress_in = 0; 422 thr->progress_out = 0; 423 424 // Return this thread to the stack of free threads. 425 thr->next = thr->coder->threads_free; 426 thr->coder->threads_free = thr; 427 428 mythread_cond_signal(&thr->coder->cond); 429 } 430 } 431 432 // Exiting, free the resources. 433 lzma_filters_free(thr->filters, thr->allocator); 434 435 mythread_mutex_destroy(&thr->mutex); 436 mythread_cond_destroy(&thr->cond); 437 438 lzma_next_end(&thr->block_encoder, thr->allocator); 439 lzma_free(thr->in, thr->allocator); 440 return MYTHREAD_RET_VALUE; 441 } 442 443 444 /// Make the threads stop but not exit. Optionally wait for them to stop. 445 static void 446 threads_stop(lzma_stream_coder *coder, bool wait_for_threads) 447 { 448 // Tell the threads to stop. 449 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 450 mythread_sync(coder->threads[i].mutex) { 451 coder->threads[i].state = THR_STOP; 452 mythread_cond_signal(&coder->threads[i].cond); 453 } 454 } 455 456 if (!wait_for_threads) 457 return; 458 459 // Wait for the threads to settle in the idle state. 460 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 461 mythread_sync(coder->threads[i].mutex) { 462 while (coder->threads[i].state != THR_IDLE) 463 mythread_cond_wait(&coder->threads[i].cond, 464 &coder->threads[i].mutex); 465 } 466 } 467 468 return; 469 } 470 471 472 /// Stop the threads and free the resources associated with them. 473 /// Wait until the threads have exited. 474 static void 475 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator) 476 { 477 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 478 mythread_sync(coder->threads[i].mutex) { 479 coder->threads[i].state = THR_EXIT; 480 mythread_cond_signal(&coder->threads[i].cond); 481 } 482 } 483 484 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 485 int ret = mythread_join(coder->threads[i].thread_id); 486 assert(ret == 0); 487 (void)ret; 488 } 489 490 lzma_free(coder->threads, allocator); 491 return; 492 } 493 494 495 /// Initialize a new worker_thread structure and create a new thread. 496 static lzma_ret 497 initialize_new_thread(lzma_stream_coder *coder, 498 const lzma_allocator *allocator) 499 { 500 worker_thread *thr = &coder->threads[coder->threads_initialized]; 501 502 thr->in = lzma_alloc(coder->block_size, allocator); 503 if (thr->in == NULL) 504 return LZMA_MEM_ERROR; 505 506 if (mythread_mutex_init(&thr->mutex)) 507 goto error_mutex; 508 509 if (mythread_cond_init(&thr->cond)) 510 goto error_cond; 511 512 thr->state = THR_IDLE; 513 thr->allocator = allocator; 514 thr->coder = coder; 515 thr->progress_in = 0; 516 thr->progress_out = 0; 517 thr->block_encoder = LZMA_NEXT_CODER_INIT; 518 thr->filters[0].id = LZMA_VLI_UNKNOWN; 519 520 if (mythread_create(&thr->thread_id, &worker_start, thr)) 521 goto error_thread; 522 523 ++coder->threads_initialized; 524 coder->thr = thr; 525 526 return LZMA_OK; 527 528 error_thread: 529 mythread_cond_destroy(&thr->cond); 530 531 error_cond: 532 mythread_mutex_destroy(&thr->mutex); 533 534 error_mutex: 535 lzma_free(thr->in, allocator); 536 return LZMA_MEM_ERROR; 537 } 538 539 540 static lzma_ret 541 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) 542 { 543 // If there are no free output subqueues, there is no 544 // point to try getting a thread. 545 if (!lzma_outq_has_buf(&coder->outq)) 546 return LZMA_OK; 547 548 // That's also true if we cannot allocate memory for the output 549 // buffer in the output queue. 550 return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, 551 coder->outbuf_alloc_size)); 552 553 // Make a thread-specific copy of the filter chain. Put it in 554 // the cache array first so that if we cannot get a new thread yet, 555 // the allocation is ready when we try again. 556 if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN) 557 return_if_error(lzma_filters_copy( 558 coder->filters, coder->filters_cache, allocator)); 559 560 // If there is a free structure on the stack, use it. 561 mythread_sync(coder->mutex) { 562 if (coder->threads_free != NULL) { 563 coder->thr = coder->threads_free; 564 coder->threads_free = coder->threads_free->next; 565 } 566 } 567 568 if (coder->thr == NULL) { 569 // If there are no uninitialized structures left, return. 570 if (coder->threads_initialized == coder->threads_max) 571 return LZMA_OK; 572 573 // Initialize a new thread. 574 return_if_error(initialize_new_thread(coder, allocator)); 575 } 576 577 // Reset the parts of the thread state that have to be done 578 // in the main thread. 579 mythread_sync(coder->thr->mutex) { 580 coder->thr->state = THR_RUN; 581 coder->thr->in_size = 0; 582 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); 583 584 // Free the old thread-specific filter options and replace 585 // them with the already-allocated new options from 586 // coder->filters_cache[]. Then mark the cache as empty. 587 lzma_filters_free(coder->thr->filters, allocator); 588 memcpy(coder->thr->filters, coder->filters_cache, 589 sizeof(coder->filters_cache)); 590 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; 591 592 mythread_cond_signal(&coder->thr->cond); 593 } 594 595 return LZMA_OK; 596 } 597 598 599 static lzma_ret 600 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, 601 const uint8_t *restrict in, size_t *restrict in_pos, 602 size_t in_size, lzma_action action) 603 { 604 while (*in_pos < in_size 605 || (coder->thr != NULL && action != LZMA_RUN)) { 606 if (coder->thr == NULL) { 607 // Get a new thread. 608 const lzma_ret ret = get_thread(coder, allocator); 609 if (coder->thr == NULL) 610 return ret; 611 } 612 613 // Copy the input data to thread's buffer. 614 size_t thr_in_size = coder->thr->in_size; 615 lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 616 &thr_in_size, coder->block_size); 617 618 // Tell the Block encoder to finish if 619 // - it has got block_size bytes of input; or 620 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 621 // or LZMA_FULL_BARRIER was used. 622 // 623 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 624 const bool finish = thr_in_size == coder->block_size 625 || (*in_pos == in_size && action != LZMA_RUN); 626 627 bool block_error = false; 628 629 mythread_sync(coder->thr->mutex) { 630 if (coder->thr->state == THR_IDLE) { 631 // Something has gone wrong with the Block 632 // encoder. It has set coder->thread_error 633 // which we will read a few lines later. 634 block_error = true; 635 } else { 636 // Tell the Block encoder its new amount 637 // of input and update the state if needed. 638 coder->thr->in_size = thr_in_size; 639 640 if (finish) 641 coder->thr->state = THR_FINISH; 642 643 mythread_cond_signal(&coder->thr->cond); 644 } 645 } 646 647 if (block_error) { 648 lzma_ret ret = LZMA_OK; // Init to silence a warning. 649 650 mythread_sync(coder->mutex) { 651 ret = coder->thread_error; 652 } 653 654 return ret; 655 } 656 657 if (finish) 658 coder->thr = NULL; 659 } 660 661 return LZMA_OK; 662 } 663 664 665 /// Wait until more input can be consumed, more output can be read, or 666 /// an optional timeout is reached. 667 static bool 668 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, 669 bool *has_blocked, bool has_input) 670 { 671 if (coder->timeout != 0 && !*has_blocked) { 672 // Every time when stream_encode_mt() is called via 673 // lzma_code(), *has_blocked starts as false. We set it 674 // to true here and calculate the absolute time when 675 // we must return if there's nothing to do. 676 // 677 // This way if we block multiple times for short moments 678 // less than "timeout" milliseconds, we will return once 679 // "timeout" amount of time has passed since the *first* 680 // blocking occurred. If the absolute time was calculated 681 // again every time we block, "timeout" would effectively 682 // be meaningless if we never consecutively block longer 683 // than "timeout" ms. 684 *has_blocked = true; 685 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 686 } 687 688 bool timed_out = false; 689 690 mythread_sync(coder->mutex) { 691 // There are four things that we wait. If one of them 692 // becomes possible, we return. 693 // - If there is input left, we need to get a free 694 // worker thread and an output buffer for it. 695 // - Data ready to be read from the output queue. 696 // - A worker thread indicates an error. 697 // - Time out occurs. 698 while ((!has_input || coder->threads_free == NULL 699 || !lzma_outq_has_buf(&coder->outq)) 700 && !lzma_outq_is_readable(&coder->outq) 701 && coder->thread_error == LZMA_OK 702 && !timed_out) { 703 if (coder->timeout != 0) 704 timed_out = mythread_cond_timedwait( 705 &coder->cond, &coder->mutex, 706 wait_abs) != 0; 707 else 708 mythread_cond_wait(&coder->cond, 709 &coder->mutex); 710 } 711 } 712 713 return timed_out; 714 } 715 716 717 static lzma_ret 718 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, 719 const uint8_t *restrict in, size_t *restrict in_pos, 720 size_t in_size, uint8_t *restrict out, 721 size_t *restrict out_pos, size_t out_size, lzma_action action) 722 { 723 lzma_stream_coder *coder = coder_ptr; 724 725 switch (coder->sequence) { 726 case SEQ_STREAM_HEADER: 727 lzma_bufcpy(coder->header, &coder->header_pos, 728 sizeof(coder->header), 729 out, out_pos, out_size); 730 if (coder->header_pos < sizeof(coder->header)) 731 return LZMA_OK; 732 733 coder->header_pos = 0; 734 coder->sequence = SEQ_BLOCK; 735 736 // Fall through 737 738 case SEQ_BLOCK: { 739 // Initialized to silence warnings. 740 lzma_vli unpadded_size = 0; 741 lzma_vli uncompressed_size = 0; 742 lzma_ret ret = LZMA_OK; 743 744 // These are for wait_for_work(). 745 bool has_blocked = false; 746 mythread_condtime wait_abs = { 0 }; 747 748 while (true) { 749 mythread_sync(coder->mutex) { 750 // Check for Block encoder errors. 751 ret = coder->thread_error; 752 if (ret != LZMA_OK) { 753 assert(ret != LZMA_STREAM_END); 754 break; // Break out of mythread_sync. 755 } 756 757 // Try to read compressed data to out[]. 758 ret = lzma_outq_read(&coder->outq, allocator, 759 out, out_pos, out_size, 760 &unpadded_size, 761 &uncompressed_size); 762 } 763 764 if (ret == LZMA_STREAM_END) { 765 // End of Block. Add it to the Index. 766 ret = lzma_index_append(coder->index, 767 allocator, unpadded_size, 768 uncompressed_size); 769 if (ret != LZMA_OK) { 770 threads_stop(coder, false); 771 return ret; 772 } 773 774 // If we didn't fill the output buffer yet, 775 // try to read more data. Maybe the next 776 // outbuf has been finished already too. 777 if (*out_pos < out_size) 778 continue; 779 } 780 781 if (ret != LZMA_OK) { 782 // coder->thread_error was set. 783 threads_stop(coder, false); 784 return ret; 785 } 786 787 // Try to give uncompressed data to a worker thread. 788 ret = stream_encode_in(coder, allocator, 789 in, in_pos, in_size, action); 790 if (ret != LZMA_OK) { 791 threads_stop(coder, false); 792 return ret; 793 } 794 795 // See if we should wait or return. 796 // 797 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 798 if (*in_pos == in_size) { 799 // LZMA_RUN: More data is probably coming 800 // so return to let the caller fill the 801 // input buffer. 802 if (action == LZMA_RUN) 803 return LZMA_OK; 804 805 // LZMA_FULL_BARRIER: The same as with 806 // LZMA_RUN but tell the caller that the 807 // barrier was completed. 808 if (action == LZMA_FULL_BARRIER) 809 return LZMA_STREAM_END; 810 811 // Finishing or flushing isn't completed until 812 // all input data has been encoded and copied 813 // to the output buffer. 814 if (lzma_outq_is_empty(&coder->outq)) { 815 // LZMA_FINISH: Continue to encode 816 // the Index field. 817 if (action == LZMA_FINISH) 818 break; 819 820 // LZMA_FULL_FLUSH: Return to tell 821 // the caller that flushing was 822 // completed. 823 if (action == LZMA_FULL_FLUSH) 824 return LZMA_STREAM_END; 825 } 826 } 827 828 // Return if there is no output space left. 829 // This check must be done after testing the input 830 // buffer, because we might want to use a different 831 // return code. 832 if (*out_pos == out_size) 833 return LZMA_OK; 834 835 // Neither in nor out has been used completely. 836 // Wait until there's something we can do. 837 if (wait_for_work(coder, &wait_abs, &has_blocked, 838 *in_pos < in_size)) 839 return LZMA_TIMED_OUT; 840 } 841 842 // All Blocks have been encoded and the threads have stopped. 843 // Prepare to encode the Index field. 844 return_if_error(lzma_index_encoder_init( 845 &coder->index_encoder, allocator, 846 coder->index)); 847 coder->sequence = SEQ_INDEX; 848 849 // Update the progress info to take the Index and 850 // Stream Footer into account. Those are very fast to encode 851 // so in terms of progress information they can be thought 852 // to be ready to be copied out. 853 coder->progress_out += lzma_index_size(coder->index) 854 + LZMA_STREAM_HEADER_SIZE; 855 } 856 857 // Fall through 858 859 case SEQ_INDEX: { 860 // Call the Index encoder. It doesn't take any input, so 861 // those pointers can be NULL. 862 const lzma_ret ret = coder->index_encoder.code( 863 coder->index_encoder.coder, allocator, 864 NULL, NULL, 0, 865 out, out_pos, out_size, LZMA_RUN); 866 if (ret != LZMA_STREAM_END) 867 return ret; 868 869 // Encode the Stream Footer into coder->buffer. 870 coder->stream_flags.backward_size 871 = lzma_index_size(coder->index); 872 if (lzma_stream_footer_encode(&coder->stream_flags, 873 coder->header) != LZMA_OK) 874 return LZMA_PROG_ERROR; 875 876 coder->sequence = SEQ_STREAM_FOOTER; 877 } 878 879 // Fall through 880 881 case SEQ_STREAM_FOOTER: 882 lzma_bufcpy(coder->header, &coder->header_pos, 883 sizeof(coder->header), 884 out, out_pos, out_size); 885 return coder->header_pos < sizeof(coder->header) 886 ? LZMA_OK : LZMA_STREAM_END; 887 } 888 889 assert(0); 890 return LZMA_PROG_ERROR; 891 } 892 893 894 static void 895 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) 896 { 897 lzma_stream_coder *coder = coder_ptr; 898 899 // Threads must be killed before the output queue can be freed. 900 threads_end(coder, allocator); 901 lzma_outq_end(&coder->outq, allocator); 902 903 lzma_filters_free(coder->filters, allocator); 904 lzma_filters_free(coder->filters_cache, allocator); 905 906 lzma_next_end(&coder->index_encoder, allocator); 907 lzma_index_end(coder->index, allocator); 908 909 mythread_cond_destroy(&coder->cond); 910 mythread_mutex_destroy(&coder->mutex); 911 912 lzma_free(coder, allocator); 913 return; 914 } 915 916 917 static lzma_ret 918 stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator, 919 const lzma_filter *filters, 920 const lzma_filter *reversed_filters 921 lzma_attribute((__unused__))) 922 { 923 lzma_stream_coder *coder = coder_ptr; 924 925 // Applications shouldn't attempt to change the options when 926 // we are already encoding the Index or Stream Footer. 927 if (coder->sequence > SEQ_BLOCK) 928 return LZMA_PROG_ERROR; 929 930 // For now the threaded encoder doesn't support changing 931 // the options in the middle of a Block. 932 if (coder->thr != NULL) 933 return LZMA_PROG_ERROR; 934 935 // Check if the filter chain seems mostly valid. See the comment 936 // in stream_encoder_mt_init(). 937 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 938 return LZMA_OPTIONS_ERROR; 939 940 // Make a copy to a temporary buffer first. This way the encoder 941 // state stays unchanged if an error occurs in lzma_filters_copy(). 942 lzma_filter temp[LZMA_FILTERS_MAX + 1]; 943 return_if_error(lzma_filters_copy(filters, temp, allocator)); 944 945 // Free the options of the old chain as well as the cache. 946 lzma_filters_free(coder->filters, allocator); 947 lzma_filters_free(coder->filters_cache, allocator); 948 949 // Copy the new filter chain in place. 950 memcpy(coder->filters, temp, sizeof(temp)); 951 952 return LZMA_OK; 953 } 954 955 956 /// Options handling for lzma_stream_encoder_mt_init() and 957 /// lzma_stream_encoder_mt_memusage() 958 static lzma_ret 959 get_options(const lzma_mt *options, lzma_options_easy *opt_easy, 960 const lzma_filter **filters, uint64_t *block_size, 961 uint64_t *outbuf_size_max) 962 { 963 // Validate some of the options. 964 if (options == NULL) 965 return LZMA_PROG_ERROR; 966 967 if (options->flags != 0 || options->threads == 0 968 || options->threads > LZMA_THREADS_MAX) 969 return LZMA_OPTIONS_ERROR; 970 971 if (options->filters != NULL) { 972 // Filter chain was given, use it as is. 973 *filters = options->filters; 974 } else { 975 // Use a preset. 976 if (lzma_easy_preset(opt_easy, options->preset)) 977 return LZMA_OPTIONS_ERROR; 978 979 *filters = opt_easy->filters; 980 } 981 982 // If the Block size is not set, determine it from the filter chain. 983 if (options->block_size > 0) 984 *block_size = options->block_size; 985 else 986 *block_size = lzma_mt_block_size(*filters); 987 988 // UINT64_MAX > BLOCK_SIZE_MAX, so the second condition 989 // should be optimized out by any reasonable compiler. 990 // The second condition should be there in the unlikely event that 991 // the macros change and UINT64_MAX < BLOCK_SIZE_MAX. 992 if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX) 993 return LZMA_OPTIONS_ERROR; 994 995 // Calculate the maximum amount output that a single output buffer 996 // may need to hold. This is the same as the maximum total size of 997 // a Block. 998 *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 999 if (*outbuf_size_max == 0) 1000 return LZMA_MEM_ERROR; 1001 1002 return LZMA_OK; 1003 } 1004 1005 1006 static void 1007 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out) 1008 { 1009 lzma_stream_coder *coder = coder_ptr; 1010 1011 // Lock coder->mutex to prevent finishing threads from moving their 1012 // progress info from the worker_thread structure to lzma_stream_coder. 1013 mythread_sync(coder->mutex) { 1014 *progress_in = coder->progress_in; 1015 *progress_out = coder->progress_out; 1016 1017 for (size_t i = 0; i < coder->threads_initialized; ++i) { 1018 mythread_sync(coder->threads[i].mutex) { 1019 *progress_in += coder->threads[i].progress_in; 1020 *progress_out += coder->threads[i] 1021 .progress_out; 1022 } 1023 } 1024 } 1025 1026 return; 1027 } 1028 1029 1030 static lzma_ret 1031 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 1032 const lzma_mt *options) 1033 { 1034 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 1035 1036 // Get the filter chain. 1037 lzma_options_easy easy; 1038 const lzma_filter *filters; 1039 uint64_t block_size; 1040 uint64_t outbuf_size_max; 1041 return_if_error(get_options(options, &easy, &filters, 1042 &block_size, &outbuf_size_max)); 1043 1044 #if SIZE_MAX < UINT64_MAX 1045 if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX) 1046 return LZMA_MEM_ERROR; 1047 #endif 1048 1049 // Validate the filter chain so that we can give an error in this 1050 // function instead of delaying it to the first call to lzma_code(). 1051 // The memory usage calculation verifies the filter chain as 1052 // a side effect so we take advantage of that. It's not a perfect 1053 // check though as raw encoder allows LZMA1 too but such problems 1054 // will be caught eventually with Block Header encoder. 1055 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 1056 return LZMA_OPTIONS_ERROR; 1057 1058 // Validate the Check ID. 1059 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 1060 return LZMA_PROG_ERROR; 1061 1062 if (!lzma_check_is_supported(options->check)) 1063 return LZMA_UNSUPPORTED_CHECK; 1064 1065 // Allocate and initialize the base structure if needed. 1066 lzma_stream_coder *coder = next->coder; 1067 if (coder == NULL) { 1068 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator); 1069 if (coder == NULL) 1070 return LZMA_MEM_ERROR; 1071 1072 next->coder = coder; 1073 1074 // For the mutex and condition variable initializations 1075 // the error handling has to be done here because 1076 // stream_encoder_mt_end() doesn't know if they have 1077 // already been initialized or not. 1078 if (mythread_mutex_init(&coder->mutex)) { 1079 lzma_free(coder, allocator); 1080 next->coder = NULL; 1081 return LZMA_MEM_ERROR; 1082 } 1083 1084 if (mythread_cond_init(&coder->cond)) { 1085 mythread_mutex_destroy(&coder->mutex); 1086 lzma_free(coder, allocator); 1087 next->coder = NULL; 1088 return LZMA_MEM_ERROR; 1089 } 1090 1091 next->code = &stream_encode_mt; 1092 next->end = &stream_encoder_mt_end; 1093 next->get_progress = &get_progress; 1094 next->update = &stream_encoder_mt_update; 1095 1096 coder->filters[0].id = LZMA_VLI_UNKNOWN; 1097 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; 1098 coder->index_encoder = LZMA_NEXT_CODER_INIT; 1099 coder->index = NULL; 1100 memzero(&coder->outq, sizeof(coder->outq)); 1101 coder->threads = NULL; 1102 coder->threads_max = 0; 1103 coder->threads_initialized = 0; 1104 } 1105 1106 // Basic initializations 1107 coder->sequence = SEQ_STREAM_HEADER; 1108 coder->block_size = (size_t)(block_size); 1109 coder->outbuf_alloc_size = (size_t)(outbuf_size_max); 1110 coder->thread_error = LZMA_OK; 1111 coder->thr = NULL; 1112 1113 // Allocate the thread-specific base structures. 1114 assert(options->threads > 0); 1115 if (coder->threads_max != options->threads) { 1116 threads_end(coder, allocator); 1117 1118 coder->threads = NULL; 1119 coder->threads_max = 0; 1120 1121 coder->threads_initialized = 0; 1122 coder->threads_free = NULL; 1123 1124 coder->threads = lzma_alloc( 1125 options->threads * sizeof(worker_thread), 1126 allocator); 1127 if (coder->threads == NULL) 1128 return LZMA_MEM_ERROR; 1129 1130 coder->threads_max = options->threads; 1131 } else { 1132 // Reuse the old structures and threads. Tell the running 1133 // threads to stop and wait until they have stopped. 1134 threads_stop(coder, true); 1135 } 1136 1137 // Output queue 1138 return_if_error(lzma_outq_init(&coder->outq, allocator, 1139 options->threads)); 1140 1141 // Timeout 1142 coder->timeout = options->timeout; 1143 1144 // Free the old filter chain and the cache. 1145 lzma_filters_free(coder->filters, allocator); 1146 lzma_filters_free(coder->filters_cache, allocator); 1147 1148 // Copy the new filter chain. 1149 return_if_error(lzma_filters_copy( 1150 filters, coder->filters, allocator)); 1151 1152 // Index 1153 lzma_index_end(coder->index, allocator); 1154 coder->index = lzma_index_init(allocator); 1155 if (coder->index == NULL) 1156 return LZMA_MEM_ERROR; 1157 1158 // Stream Header 1159 coder->stream_flags.version = 0; 1160 coder->stream_flags.check = options->check; 1161 return_if_error(lzma_stream_header_encode( 1162 &coder->stream_flags, coder->header)); 1163 1164 coder->header_pos = 0; 1165 1166 // Progress info 1167 coder->progress_in = 0; 1168 coder->progress_out = LZMA_STREAM_HEADER_SIZE; 1169 1170 return LZMA_OK; 1171 } 1172 1173 1174 #ifdef HAVE_SYMBOL_VERSIONS_LINUX 1175 // These are for compatibility with binaries linked against liblzma that 1176 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7. 1177 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2 1178 // but it has been added here anyway since someone might misread the 1179 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist. 1180 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha", 1181 lzma_ret, lzma_stream_encoder_mt_512a)( 1182 lzma_stream *strm, const lzma_mt *options) 1183 lzma_nothrow lzma_attr_warn_unused_result 1184 __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 1185 1186 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2", 1187 lzma_ret, lzma_stream_encoder_mt_522)( 1188 lzma_stream *strm, const lzma_mt *options) 1189 lzma_nothrow lzma_attr_warn_unused_result 1190 __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 1191 1192 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2", 1193 lzma_ret, lzma_stream_encoder_mt_52)( 1194 lzma_stream *strm, const lzma_mt *options) 1195 lzma_nothrow lzma_attr_warn_unused_result; 1196 1197 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52 1198 #endif 1199 extern LZMA_API(lzma_ret) 1200 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 1201 { 1202 lzma_next_strm_init(stream_encoder_mt_init, strm, options); 1203 1204 strm->internal->supported_actions[LZMA_RUN] = true; 1205 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 1206 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 1207 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 1208 strm->internal->supported_actions[LZMA_FINISH] = true; 1209 1210 return LZMA_OK; 1211 } 1212 1213 1214 #ifdef HAVE_SYMBOL_VERSIONS_LINUX 1215 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha", 1216 uint64_t, lzma_stream_encoder_mt_memusage_512a)( 1217 const lzma_mt *options) lzma_nothrow lzma_attr_pure 1218 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 1219 1220 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2", 1221 uint64_t, lzma_stream_encoder_mt_memusage_522)( 1222 const lzma_mt *options) lzma_nothrow lzma_attr_pure 1223 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 1224 1225 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2", 1226 uint64_t, lzma_stream_encoder_mt_memusage_52)( 1227 const lzma_mt *options) lzma_nothrow lzma_attr_pure; 1228 1229 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52 1230 #endif 1231 // This function name is a monster but it's consistent with the older 1232 // monster names. :-( 31 chars is the max that C99 requires so in that 1233 // sense it's not too long. ;-) 1234 extern LZMA_API(uint64_t) 1235 lzma_stream_encoder_mt_memusage(const lzma_mt *options) 1236 { 1237 lzma_options_easy easy; 1238 const lzma_filter *filters; 1239 uint64_t block_size; 1240 uint64_t outbuf_size_max; 1241 1242 if (get_options(options, &easy, &filters, &block_size, 1243 &outbuf_size_max) != LZMA_OK) 1244 return UINT64_MAX; 1245 1246 // Memory usage of the input buffers 1247 const uint64_t inbuf_memusage = options->threads * block_size; 1248 1249 // Memory usage of the filter encoders 1250 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 1251 if (filters_memusage == UINT64_MAX) 1252 return UINT64_MAX; 1253 1254 filters_memusage *= options->threads; 1255 1256 // Memory usage of the output queue 1257 const uint64_t outq_memusage = lzma_outq_memusage( 1258 outbuf_size_max, options->threads); 1259 if (outq_memusage == UINT64_MAX) 1260 return UINT64_MAX; 1261 1262 // Sum them with overflow checking. 1263 uint64_t total_memusage = LZMA_MEMUSAGE_BASE 1264 + sizeof(lzma_stream_coder) 1265 + options->threads * sizeof(worker_thread); 1266 1267 if (UINT64_MAX - total_memusage < inbuf_memusage) 1268 return UINT64_MAX; 1269 1270 total_memusage += inbuf_memusage; 1271 1272 if (UINT64_MAX - total_memusage < filters_memusage) 1273 return UINT64_MAX; 1274 1275 total_memusage += filters_memusage; 1276 1277 if (UINT64_MAX - total_memusage < outq_memusage) 1278 return UINT64_MAX; 1279 1280 return total_memusage + outq_memusage; 1281 } 1282