1 /////////////////////////////////////////////////////////////////////////////// 2 // 3 /// \file stream_encoder_mt.c 4 /// \brief Multithreaded .xz Stream encoder 5 // 6 // Author: Lasse Collin 7 // 8 // This file has been put into the public domain. 9 // You can do whatever you want with this file. 10 // 11 /////////////////////////////////////////////////////////////////////////////// 12 13 #include "filter_encoder.h" 14 #include "easy_preset.h" 15 #include "block_encoder.h" 16 #include "block_buffer_encoder.h" 17 #include "index_encoder.h" 18 #include "outqueue.h" 19 20 21 /// Maximum supported block size. This makes it simpler to prevent integer 22 /// overflows if we are given unusually large block size. 23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 24 25 26 typedef enum { 27 /// Waiting for work. 28 THR_IDLE, 29 30 /// Encoding is in progress. 31 THR_RUN, 32 33 /// Encoding is in progress but no more input data will 34 /// be read. 35 THR_FINISH, 36 37 /// The main thread wants the thread to stop whatever it was doing 38 /// but not exit. 39 THR_STOP, 40 41 /// The main thread wants the thread to exit. We could use 42 /// cancellation but since there's stopped anyway, this is lazier. 43 THR_EXIT, 44 45 } worker_state; 46 47 typedef struct lzma_stream_coder_s lzma_stream_coder; 48 49 typedef struct worker_thread_s worker_thread; 50 struct worker_thread_s { 51 worker_state state; 52 53 /// Input buffer of coder->block_size bytes. The main thread will 54 /// put new input into this and update in_size accordingly. Once 55 /// no more input is coming, state will be set to THR_FINISH. 56 uint8_t *in; 57 58 /// Amount of data available in the input buffer. This is modified 59 /// only by the main thread. 60 size_t in_size; 61 62 /// Output buffer for this thread. This is set by the main 63 /// thread every time a new Block is started with this thread 64 /// structure. 65 lzma_outbuf *outbuf; 66 67 /// Pointer to the main structure is needed when putting this 68 /// thread back to the stack of free threads. 69 lzma_stream_coder *coder; 70 71 /// The allocator is set by the main thread. Since a copy of the 72 /// pointer is kept here, the application must not change the 73 /// allocator before calling lzma_end(). 74 const lzma_allocator *allocator; 75 76 /// Amount of uncompressed data that has already been compressed. 77 uint64_t progress_in; 78 79 /// Amount of compressed data that is ready. 80 uint64_t progress_out; 81 82 /// Block encoder 83 lzma_next_coder block_encoder; 84 85 /// Compression options for this Block 86 lzma_block block_options; 87 88 /// Next structure in the stack of free worker threads. 89 worker_thread *next; 90 91 mythread_mutex mutex; 92 mythread_cond cond; 93 94 /// The ID of this thread is used to join the thread 95 /// when it's not needed anymore. 96 mythread thread_id; 97 }; 98 99 100 struct lzma_stream_coder_s { 101 enum { 102 SEQ_STREAM_HEADER, 103 SEQ_BLOCK, 104 SEQ_INDEX, 105 SEQ_STREAM_FOOTER, 106 } sequence; 107 108 /// Start a new Block every block_size bytes of input unless 109 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 110 size_t block_size; 111 112 /// The filter chain currently in use 113 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 114 115 116 /// Index to hold sizes of the Blocks 117 lzma_index *index; 118 119 /// Index encoder 120 lzma_next_coder index_encoder; 121 122 123 /// Stream Flags for encoding the Stream Header and Stream Footer. 124 lzma_stream_flags stream_flags; 125 126 /// Buffer to hold Stream Header and Stream Footer. 127 uint8_t header[LZMA_STREAM_HEADER_SIZE]; 128 129 /// Read position in header[] 130 size_t header_pos; 131 132 133 /// Output buffer queue for compressed data 134 lzma_outq outq; 135 136 137 /// Maximum wait time if cannot use all the input and cannot 138 /// fill the output buffer. This is in milliseconds. 139 uint32_t timeout; 140 141 142 /// Error code from a worker thread 143 lzma_ret thread_error; 144 145 /// Array of allocated thread-specific structures 146 worker_thread *threads; 147 148 /// Number of structures in "threads" above. This is also the 149 /// number of threads that will be created at maximum. 150 uint32_t threads_max; 151 152 /// Number of thread structures that have been initialized, and 153 /// thus the number of worker threads actually created so far. 154 uint32_t threads_initialized; 155 156 /// Stack of free threads. When a thread finishes, it puts itself 157 /// back into this stack. This starts as empty because threads 158 /// are created only when actually needed. 159 worker_thread *threads_free; 160 161 /// The most recent worker thread to which the main thread writes 162 /// the new input from the application. 163 worker_thread *thr; 164 165 166 /// Amount of uncompressed data in Blocks that have already 167 /// been finished. 168 uint64_t progress_in; 169 170 /// Amount of compressed data in Stream Header + Blocks that 171 /// have already been finished. 172 uint64_t progress_out; 173 174 175 mythread_mutex mutex; 176 mythread_cond cond; 177 }; 178 179 180 /// Tell the main thread that something has gone wrong. 181 static void 182 worker_error(worker_thread *thr, lzma_ret ret) 183 { 184 assert(ret != LZMA_OK); 185 assert(ret != LZMA_STREAM_END); 186 187 mythread_sync(thr->coder->mutex) { 188 if (thr->coder->thread_error == LZMA_OK) 189 thr->coder->thread_error = ret; 190 191 mythread_cond_signal(&thr->coder->cond); 192 } 193 194 return; 195 } 196 197 198 static worker_state 199 worker_encode(worker_thread *thr, worker_state state) 200 { 201 assert(thr->progress_in == 0); 202 assert(thr->progress_out == 0); 203 204 // Set the Block options. 205 thr->block_options = (lzma_block){ 206 .version = 0, 207 .check = thr->coder->stream_flags.check, 208 .compressed_size = thr->coder->outq.buf_size_max, 209 .uncompressed_size = thr->coder->block_size, 210 211 // TODO: To allow changing the filter chain, the filters 212 // array must be copied to each worker_thread. 213 .filters = thr->coder->filters, 214 }; 215 216 // Calculate maximum size of the Block Header. This amount is 217 // reserved in the beginning of the buffer so that Block Header 218 // along with Compressed Size and Uncompressed Size can be 219 // written there. 220 lzma_ret ret = lzma_block_header_size(&thr->block_options); 221 if (ret != LZMA_OK) { 222 worker_error(thr, ret); 223 return THR_STOP; 224 } 225 226 // Initialize the Block encoder. 227 ret = lzma_block_encoder_init(&thr->block_encoder, 228 thr->allocator, &thr->block_options); 229 if (ret != LZMA_OK) { 230 worker_error(thr, ret); 231 return THR_STOP; 232 } 233 234 size_t in_pos = 0; 235 size_t in_size = 0; 236 237 thr->outbuf->size = thr->block_options.header_size; 238 const size_t out_size = thr->coder->outq.buf_size_max; 239 240 do { 241 mythread_sync(thr->mutex) { 242 // Store in_pos and out_pos into *thr so that 243 // an application may read them via 244 // lzma_get_progress() to get progress information. 245 // 246 // NOTE: These aren't updated when the encoding 247 // finishes. Instead, the final values are taken 248 // later from thr->outbuf. 249 thr->progress_in = in_pos; 250 thr->progress_out = thr->outbuf->size; 251 252 while (in_size == thr->in_size 253 && thr->state == THR_RUN) 254 mythread_cond_wait(&thr->cond, &thr->mutex); 255 256 state = thr->state; 257 in_size = thr->in_size; 258 } 259 260 // Return if we were asked to stop or exit. 261 if (state >= THR_STOP) 262 return state; 263 264 lzma_action action = state == THR_FINISH 265 ? LZMA_FINISH : LZMA_RUN; 266 267 // Limit the amount of input given to the Block encoder 268 // at once. This way this thread can react fairly quickly 269 // if the main thread wants us to stop or exit. 270 static const size_t in_chunk_max = 16384; 271 size_t in_limit = in_size; 272 if (in_size - in_pos > in_chunk_max) { 273 in_limit = in_pos + in_chunk_max; 274 action = LZMA_RUN; 275 } 276 277 ret = thr->block_encoder.code( 278 thr->block_encoder.coder, thr->allocator, 279 thr->in, &in_pos, in_limit, thr->outbuf->buf, 280 &thr->outbuf->size, out_size, action); 281 } while (ret == LZMA_OK && thr->outbuf->size < out_size); 282 283 switch (ret) { 284 case LZMA_STREAM_END: 285 assert(state == THR_FINISH); 286 287 // Encode the Block Header. By doing it after 288 // the compression, we can store the Compressed Size 289 // and Uncompressed Size fields. 290 ret = lzma_block_header_encode(&thr->block_options, 291 thr->outbuf->buf); 292 if (ret != LZMA_OK) { 293 worker_error(thr, ret); 294 return THR_STOP; 295 } 296 297 break; 298 299 case LZMA_OK: 300 // The data was incompressible. Encode it using uncompressed 301 // LZMA2 chunks. 302 // 303 // First wait that we have gotten all the input. 304 mythread_sync(thr->mutex) { 305 while (thr->state == THR_RUN) 306 mythread_cond_wait(&thr->cond, &thr->mutex); 307 308 state = thr->state; 309 in_size = thr->in_size; 310 } 311 312 if (state >= THR_STOP) 313 return state; 314 315 // Do the encoding. This takes care of the Block Header too. 316 thr->outbuf->size = 0; 317 ret = lzma_block_uncomp_encode(&thr->block_options, 318 thr->in, in_size, thr->outbuf->buf, 319 &thr->outbuf->size, out_size); 320 321 // It shouldn't fail. 322 if (ret != LZMA_OK) { 323 worker_error(thr, LZMA_PROG_ERROR); 324 return THR_STOP; 325 } 326 327 break; 328 329 default: 330 worker_error(thr, ret); 331 return THR_STOP; 332 } 333 334 // Set the size information that will be read by the main thread 335 // to write the Index field. 336 thr->outbuf->unpadded_size 337 = lzma_block_unpadded_size(&thr->block_options); 338 assert(thr->outbuf->unpadded_size != 0); 339 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 340 341 return THR_FINISH; 342 } 343 344 345 static MYTHREAD_RET_TYPE 346 worker_start(void *thr_ptr) 347 { 348 worker_thread *thr = thr_ptr; 349 worker_state state = THR_IDLE; // Init to silence a warning 350 351 while (true) { 352 // Wait for work. 353 mythread_sync(thr->mutex) { 354 while (true) { 355 // The thread is already idle so if we are 356 // requested to stop, just set the state. 357 if (thr->state == THR_STOP) { 358 thr->state = THR_IDLE; 359 mythread_cond_signal(&thr->cond); 360 } 361 362 state = thr->state; 363 if (state != THR_IDLE) 364 break; 365 366 mythread_cond_wait(&thr->cond, &thr->mutex); 367 } 368 } 369 370 assert(state != THR_IDLE); 371 assert(state != THR_STOP); 372 373 if (state <= THR_FINISH) 374 state = worker_encode(thr, state); 375 376 if (state == THR_EXIT) 377 break; 378 379 // Mark the thread as idle unless the main thread has 380 // told us to exit. Signal is needed for the case 381 // where the main thread is waiting for the threads to stop. 382 mythread_sync(thr->mutex) { 383 if (thr->state != THR_EXIT) { 384 thr->state = THR_IDLE; 385 mythread_cond_signal(&thr->cond); 386 } 387 } 388 389 mythread_sync(thr->coder->mutex) { 390 // Mark the output buffer as finished if 391 // no errors occurred. 392 thr->outbuf->finished = state == THR_FINISH; 393 394 // Update the main progress info. 395 thr->coder->progress_in 396 += thr->outbuf->uncompressed_size; 397 thr->coder->progress_out += thr->outbuf->size; 398 thr->progress_in = 0; 399 thr->progress_out = 0; 400 401 // Return this thread to the stack of free threads. 402 thr->next = thr->coder->threads_free; 403 thr->coder->threads_free = thr; 404 405 mythread_cond_signal(&thr->coder->cond); 406 } 407 } 408 409 // Exiting, free the resources. 410 mythread_mutex_destroy(&thr->mutex); 411 mythread_cond_destroy(&thr->cond); 412 413 lzma_next_end(&thr->block_encoder, thr->allocator); 414 lzma_free(thr->in, thr->allocator); 415 return MYTHREAD_RET_VALUE; 416 } 417 418 419 /// Make the threads stop but not exit. Optionally wait for them to stop. 420 static void 421 threads_stop(lzma_stream_coder *coder, bool wait_for_threads) 422 { 423 // Tell the threads to stop. 424 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 425 mythread_sync(coder->threads[i].mutex) { 426 coder->threads[i].state = THR_STOP; 427 mythread_cond_signal(&coder->threads[i].cond); 428 } 429 } 430 431 if (!wait_for_threads) 432 return; 433 434 // Wait for the threads to settle in the idle state. 435 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 436 mythread_sync(coder->threads[i].mutex) { 437 while (coder->threads[i].state != THR_IDLE) 438 mythread_cond_wait(&coder->threads[i].cond, 439 &coder->threads[i].mutex); 440 } 441 } 442 443 return; 444 } 445 446 447 /// Stop the threads and free the resources associated with them. 448 /// Wait until the threads have exited. 449 static void 450 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator) 451 { 452 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 453 mythread_sync(coder->threads[i].mutex) { 454 coder->threads[i].state = THR_EXIT; 455 mythread_cond_signal(&coder->threads[i].cond); 456 } 457 } 458 459 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 460 int ret = mythread_join(coder->threads[i].thread_id); 461 assert(ret == 0); 462 (void)ret; 463 } 464 465 lzma_free(coder->threads, allocator); 466 return; 467 } 468 469 470 /// Initialize a new worker_thread structure and create a new thread. 471 static lzma_ret 472 initialize_new_thread(lzma_stream_coder *coder, 473 const lzma_allocator *allocator) 474 { 475 worker_thread *thr = &coder->threads[coder->threads_initialized]; 476 477 thr->in = lzma_alloc(coder->block_size, allocator); 478 if (thr->in == NULL) 479 return LZMA_MEM_ERROR; 480 481 if (mythread_mutex_init(&thr->mutex)) 482 goto error_mutex; 483 484 if (mythread_cond_init(&thr->cond)) 485 goto error_cond; 486 487 thr->state = THR_IDLE; 488 thr->allocator = allocator; 489 thr->coder = coder; 490 thr->progress_in = 0; 491 thr->progress_out = 0; 492 thr->block_encoder = LZMA_NEXT_CODER_INIT; 493 494 if (mythread_create(&thr->thread_id, &worker_start, thr)) 495 goto error_thread; 496 497 ++coder->threads_initialized; 498 coder->thr = thr; 499 500 return LZMA_OK; 501 502 error_thread: 503 mythread_cond_destroy(&thr->cond); 504 505 error_cond: 506 mythread_mutex_destroy(&thr->mutex); 507 508 error_mutex: 509 lzma_free(thr->in, allocator); 510 return LZMA_MEM_ERROR; 511 } 512 513 514 static lzma_ret 515 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) 516 { 517 // If there are no free output subqueues, there is no 518 // point to try getting a thread. 519 if (!lzma_outq_has_buf(&coder->outq)) 520 return LZMA_OK; 521 522 // If there is a free structure on the stack, use it. 523 mythread_sync(coder->mutex) { 524 if (coder->threads_free != NULL) { 525 coder->thr = coder->threads_free; 526 coder->threads_free = coder->threads_free->next; 527 } 528 } 529 530 if (coder->thr == NULL) { 531 // If there are no uninitialized structures left, return. 532 if (coder->threads_initialized == coder->threads_max) 533 return LZMA_OK; 534 535 // Initialize a new thread. 536 return_if_error(initialize_new_thread(coder, allocator)); 537 } 538 539 // Reset the parts of the thread state that have to be done 540 // in the main thread. 541 mythread_sync(coder->thr->mutex) { 542 coder->thr->state = THR_RUN; 543 coder->thr->in_size = 0; 544 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); 545 mythread_cond_signal(&coder->thr->cond); 546 } 547 548 return LZMA_OK; 549 } 550 551 552 static lzma_ret 553 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, 554 const uint8_t *restrict in, size_t *restrict in_pos, 555 size_t in_size, lzma_action action) 556 { 557 while (*in_pos < in_size 558 || (coder->thr != NULL && action != LZMA_RUN)) { 559 if (coder->thr == NULL) { 560 // Get a new thread. 561 const lzma_ret ret = get_thread(coder, allocator); 562 if (coder->thr == NULL) 563 return ret; 564 } 565 566 // Copy the input data to thread's buffer. 567 size_t thr_in_size = coder->thr->in_size; 568 lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 569 &thr_in_size, coder->block_size); 570 571 // Tell the Block encoder to finish if 572 // - it has got block_size bytes of input; or 573 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 574 // or LZMA_FULL_BARRIER was used. 575 // 576 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 577 const bool finish = thr_in_size == coder->block_size 578 || (*in_pos == in_size && action != LZMA_RUN); 579 580 bool block_error = false; 581 582 mythread_sync(coder->thr->mutex) { 583 if (coder->thr->state == THR_IDLE) { 584 // Something has gone wrong with the Block 585 // encoder. It has set coder->thread_error 586 // which we will read a few lines later. 587 block_error = true; 588 } else { 589 // Tell the Block encoder its new amount 590 // of input and update the state if needed. 591 coder->thr->in_size = thr_in_size; 592 593 if (finish) 594 coder->thr->state = THR_FINISH; 595 596 mythread_cond_signal(&coder->thr->cond); 597 } 598 } 599 600 if (block_error) { 601 lzma_ret ret; 602 603 mythread_sync(coder->mutex) { 604 ret = coder->thread_error; 605 } 606 607 return ret; 608 } 609 610 if (finish) 611 coder->thr = NULL; 612 } 613 614 return LZMA_OK; 615 } 616 617 618 /// Wait until more input can be consumed, more output can be read, or 619 /// an optional timeout is reached. 620 static bool 621 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, 622 bool *has_blocked, bool has_input) 623 { 624 if (coder->timeout != 0 && !*has_blocked) { 625 // Every time when stream_encode_mt() is called via 626 // lzma_code(), *has_blocked starts as false. We set it 627 // to true here and calculate the absolute time when 628 // we must return if there's nothing to do. 629 // 630 // The idea of *has_blocked is to avoid unneeded calls 631 // to mythread_condtime_set(), which may do a syscall 632 // depending on the operating system. 633 *has_blocked = true; 634 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 635 } 636 637 bool timed_out = false; 638 639 mythread_sync(coder->mutex) { 640 // There are four things that we wait. If one of them 641 // becomes possible, we return. 642 // - If there is input left, we need to get a free 643 // worker thread and an output buffer for it. 644 // - Data ready to be read from the output queue. 645 // - A worker thread indicates an error. 646 // - Time out occurs. 647 while ((!has_input || coder->threads_free == NULL 648 || !lzma_outq_has_buf(&coder->outq)) 649 && !lzma_outq_is_readable(&coder->outq) 650 && coder->thread_error == LZMA_OK 651 && !timed_out) { 652 if (coder->timeout != 0) 653 timed_out = mythread_cond_timedwait( 654 &coder->cond, &coder->mutex, 655 wait_abs) != 0; 656 else 657 mythread_cond_wait(&coder->cond, 658 &coder->mutex); 659 } 660 } 661 662 return timed_out; 663 } 664 665 666 static lzma_ret 667 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, 668 const uint8_t *restrict in, size_t *restrict in_pos, 669 size_t in_size, uint8_t *restrict out, 670 size_t *restrict out_pos, size_t out_size, lzma_action action) 671 { 672 lzma_stream_coder *coder = coder_ptr; 673 674 switch (coder->sequence) { 675 case SEQ_STREAM_HEADER: 676 lzma_bufcpy(coder->header, &coder->header_pos, 677 sizeof(coder->header), 678 out, out_pos, out_size); 679 if (coder->header_pos < sizeof(coder->header)) 680 return LZMA_OK; 681 682 coder->header_pos = 0; 683 coder->sequence = SEQ_BLOCK; 684 685 // Fall through 686 687 case SEQ_BLOCK: { 688 // Initialized to silence warnings. 689 lzma_vli unpadded_size = 0; 690 lzma_vli uncompressed_size = 0; 691 lzma_ret ret = LZMA_OK; 692 693 // These are for wait_for_work(). 694 bool has_blocked = false; 695 mythread_condtime wait_abs; 696 697 while (true) { 698 mythread_sync(coder->mutex) { 699 // Check for Block encoder errors. 700 ret = coder->thread_error; 701 if (ret != LZMA_OK) { 702 assert(ret != LZMA_STREAM_END); 703 break; // Break out of mythread_sync. 704 } 705 706 // Try to read compressed data to out[]. 707 ret = lzma_outq_read(&coder->outq, 708 out, out_pos, out_size, 709 &unpadded_size, 710 &uncompressed_size); 711 } 712 713 if (ret == LZMA_STREAM_END) { 714 // End of Block. Add it to the Index. 715 ret = lzma_index_append(coder->index, 716 allocator, unpadded_size, 717 uncompressed_size); 718 if (ret != LZMA_OK) { 719 threads_stop(coder, false); 720 return ret; 721 } 722 723 // If we didn't fill the output buffer yet, 724 // try to read more data. Maybe the next 725 // outbuf has been finished already too. 726 if (*out_pos < out_size) 727 continue; 728 } 729 730 if (ret != LZMA_OK) { 731 // coder->thread_error was set. 732 threads_stop(coder, false); 733 return ret; 734 } 735 736 // Try to give uncompressed data to a worker thread. 737 ret = stream_encode_in(coder, allocator, 738 in, in_pos, in_size, action); 739 if (ret != LZMA_OK) { 740 threads_stop(coder, false); 741 return ret; 742 } 743 744 // See if we should wait or return. 745 // 746 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 747 if (*in_pos == in_size) { 748 // LZMA_RUN: More data is probably coming 749 // so return to let the caller fill the 750 // input buffer. 751 if (action == LZMA_RUN) 752 return LZMA_OK; 753 754 // LZMA_FULL_BARRIER: The same as with 755 // LZMA_RUN but tell the caller that the 756 // barrier was completed. 757 if (action == LZMA_FULL_BARRIER) 758 return LZMA_STREAM_END; 759 760 // Finishing or flushing isn't completed until 761 // all input data has been encoded and copied 762 // to the output buffer. 763 if (lzma_outq_is_empty(&coder->outq)) { 764 // LZMA_FINISH: Continue to encode 765 // the Index field. 766 if (action == LZMA_FINISH) 767 break; 768 769 // LZMA_FULL_FLUSH: Return to tell 770 // the caller that flushing was 771 // completed. 772 if (action == LZMA_FULL_FLUSH) 773 return LZMA_STREAM_END; 774 } 775 } 776 777 // Return if there is no output space left. 778 // This check must be done after testing the input 779 // buffer, because we might want to use a different 780 // return code. 781 if (*out_pos == out_size) 782 return LZMA_OK; 783 784 // Neither in nor out has been used completely. 785 // Wait until there's something we can do. 786 if (wait_for_work(coder, &wait_abs, &has_blocked, 787 *in_pos < in_size)) 788 return LZMA_TIMED_OUT; 789 } 790 791 // All Blocks have been encoded and the threads have stopped. 792 // Prepare to encode the Index field. 793 return_if_error(lzma_index_encoder_init( 794 &coder->index_encoder, allocator, 795 coder->index)); 796 coder->sequence = SEQ_INDEX; 797 798 // Update the progress info to take the Index and 799 // Stream Footer into account. Those are very fast to encode 800 // so in terms of progress information they can be thought 801 // to be ready to be copied out. 802 coder->progress_out += lzma_index_size(coder->index) 803 + LZMA_STREAM_HEADER_SIZE; 804 } 805 806 // Fall through 807 808 case SEQ_INDEX: { 809 // Call the Index encoder. It doesn't take any input, so 810 // those pointers can be NULL. 811 const lzma_ret ret = coder->index_encoder.code( 812 coder->index_encoder.coder, allocator, 813 NULL, NULL, 0, 814 out, out_pos, out_size, LZMA_RUN); 815 if (ret != LZMA_STREAM_END) 816 return ret; 817 818 // Encode the Stream Footer into coder->buffer. 819 coder->stream_flags.backward_size 820 = lzma_index_size(coder->index); 821 if (lzma_stream_footer_encode(&coder->stream_flags, 822 coder->header) != LZMA_OK) 823 return LZMA_PROG_ERROR; 824 825 coder->sequence = SEQ_STREAM_FOOTER; 826 } 827 828 // Fall through 829 830 case SEQ_STREAM_FOOTER: 831 lzma_bufcpy(coder->header, &coder->header_pos, 832 sizeof(coder->header), 833 out, out_pos, out_size); 834 return coder->header_pos < sizeof(coder->header) 835 ? LZMA_OK : LZMA_STREAM_END; 836 } 837 838 assert(0); 839 return LZMA_PROG_ERROR; 840 } 841 842 843 static void 844 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) 845 { 846 lzma_stream_coder *coder = coder_ptr; 847 848 // Threads must be killed before the output queue can be freed. 849 threads_end(coder, allocator); 850 lzma_outq_end(&coder->outq, allocator); 851 852 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 853 lzma_free(coder->filters[i].options, allocator); 854 855 lzma_next_end(&coder->index_encoder, allocator); 856 lzma_index_end(coder->index, allocator); 857 858 mythread_cond_destroy(&coder->cond); 859 mythread_mutex_destroy(&coder->mutex); 860 861 lzma_free(coder, allocator); 862 return; 863 } 864 865 866 /// Options handling for lzma_stream_encoder_mt_init() and 867 /// lzma_stream_encoder_mt_memusage() 868 static lzma_ret 869 get_options(const lzma_mt *options, lzma_options_easy *opt_easy, 870 const lzma_filter **filters, uint64_t *block_size, 871 uint64_t *outbuf_size_max) 872 { 873 // Validate some of the options. 874 if (options == NULL) 875 return LZMA_PROG_ERROR; 876 877 if (options->flags != 0 || options->threads == 0 878 || options->threads > LZMA_THREADS_MAX) 879 return LZMA_OPTIONS_ERROR; 880 881 if (options->filters != NULL) { 882 // Filter chain was given, use it as is. 883 *filters = options->filters; 884 } else { 885 // Use a preset. 886 if (lzma_easy_preset(opt_easy, options->preset)) 887 return LZMA_OPTIONS_ERROR; 888 889 *filters = opt_easy->filters; 890 } 891 892 // Block size 893 if (options->block_size > 0) { 894 if (options->block_size > BLOCK_SIZE_MAX) 895 return LZMA_OPTIONS_ERROR; 896 897 *block_size = options->block_size; 898 } else { 899 // Determine the Block size from the filter chain. 900 *block_size = lzma_mt_block_size(*filters); 901 if (*block_size == 0) 902 return LZMA_OPTIONS_ERROR; 903 904 assert(*block_size <= BLOCK_SIZE_MAX); 905 } 906 907 // Calculate the maximum amount output that a single output buffer 908 // may need to hold. This is the same as the maximum total size of 909 // a Block. 910 *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 911 if (*outbuf_size_max == 0) 912 return LZMA_MEM_ERROR; 913 914 return LZMA_OK; 915 } 916 917 918 static void 919 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out) 920 { 921 lzma_stream_coder *coder = coder_ptr; 922 923 // Lock coder->mutex to prevent finishing threads from moving their 924 // progress info from the worker_thread structure to lzma_stream_coder. 925 mythread_sync(coder->mutex) { 926 *progress_in = coder->progress_in; 927 *progress_out = coder->progress_out; 928 929 for (size_t i = 0; i < coder->threads_initialized; ++i) { 930 mythread_sync(coder->threads[i].mutex) { 931 *progress_in += coder->threads[i].progress_in; 932 *progress_out += coder->threads[i] 933 .progress_out; 934 } 935 } 936 } 937 938 return; 939 } 940 941 942 static lzma_ret 943 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 944 const lzma_mt *options) 945 { 946 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 947 948 // Get the filter chain. 949 lzma_options_easy easy; 950 const lzma_filter *filters; 951 uint64_t block_size; 952 uint64_t outbuf_size_max; 953 return_if_error(get_options(options, &easy, &filters, 954 &block_size, &outbuf_size_max)); 955 956 #if SIZE_MAX < UINT64_MAX 957 if (block_size > SIZE_MAX) 958 return LZMA_MEM_ERROR; 959 #endif 960 961 // Validate the filter chain so that we can give an error in this 962 // function instead of delaying it to the first call to lzma_code(). 963 // The memory usage calculation verifies the filter chain as 964 // a side effect so we take advantage of that. 965 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 966 return LZMA_OPTIONS_ERROR; 967 968 // Validate the Check ID. 969 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 970 return LZMA_PROG_ERROR; 971 972 if (!lzma_check_is_supported(options->check)) 973 return LZMA_UNSUPPORTED_CHECK; 974 975 // Allocate and initialize the base structure if needed. 976 lzma_stream_coder *coder = next->coder; 977 if (coder == NULL) { 978 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator); 979 if (coder == NULL) 980 return LZMA_MEM_ERROR; 981 982 next->coder = coder; 983 984 // For the mutex and condition variable initializations 985 // the error handling has to be done here because 986 // stream_encoder_mt_end() doesn't know if they have 987 // already been initialized or not. 988 if (mythread_mutex_init(&coder->mutex)) { 989 lzma_free(coder, allocator); 990 next->coder = NULL; 991 return LZMA_MEM_ERROR; 992 } 993 994 if (mythread_cond_init(&coder->cond)) { 995 mythread_mutex_destroy(&coder->mutex); 996 lzma_free(coder, allocator); 997 next->coder = NULL; 998 return LZMA_MEM_ERROR; 999 } 1000 1001 next->code = &stream_encode_mt; 1002 next->end = &stream_encoder_mt_end; 1003 next->get_progress = &get_progress; 1004 // next->update = &stream_encoder_mt_update; 1005 1006 coder->filters[0].id = LZMA_VLI_UNKNOWN; 1007 coder->index_encoder = LZMA_NEXT_CODER_INIT; 1008 coder->index = NULL; 1009 memzero(&coder->outq, sizeof(coder->outq)); 1010 coder->threads = NULL; 1011 coder->threads_max = 0; 1012 coder->threads_initialized = 0; 1013 } 1014 1015 // Basic initializations 1016 coder->sequence = SEQ_STREAM_HEADER; 1017 coder->block_size = (size_t)(block_size); 1018 coder->thread_error = LZMA_OK; 1019 coder->thr = NULL; 1020 1021 // Allocate the thread-specific base structures. 1022 assert(options->threads > 0); 1023 if (coder->threads_max != options->threads) { 1024 threads_end(coder, allocator); 1025 1026 coder->threads = NULL; 1027 coder->threads_max = 0; 1028 1029 coder->threads_initialized = 0; 1030 coder->threads_free = NULL; 1031 1032 coder->threads = lzma_alloc( 1033 options->threads * sizeof(worker_thread), 1034 allocator); 1035 if (coder->threads == NULL) 1036 return LZMA_MEM_ERROR; 1037 1038 coder->threads_max = options->threads; 1039 } else { 1040 // Reuse the old structures and threads. Tell the running 1041 // threads to stop and wait until they have stopped. 1042 threads_stop(coder, true); 1043 } 1044 1045 // Output queue 1046 return_if_error(lzma_outq_init(&coder->outq, allocator, 1047 outbuf_size_max, options->threads)); 1048 1049 // Timeout 1050 coder->timeout = options->timeout; 1051 1052 // Free the old filter chain and copy the new one. 1053 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 1054 lzma_free(coder->filters[i].options, allocator); 1055 1056 // Mark it as empty so that it is in a safe state in case 1057 // lzma_filters_copy() fails. 1058 coder->filters[0].id = LZMA_VLI_UNKNOWN; 1059 1060 return_if_error(lzma_filters_copy( 1061 filters, coder->filters, allocator)); 1062 1063 // Index 1064 lzma_index_end(coder->index, allocator); 1065 coder->index = lzma_index_init(allocator); 1066 if (coder->index == NULL) 1067 return LZMA_MEM_ERROR; 1068 1069 // Stream Header 1070 coder->stream_flags.version = 0; 1071 coder->stream_flags.check = options->check; 1072 return_if_error(lzma_stream_header_encode( 1073 &coder->stream_flags, coder->header)); 1074 1075 coder->header_pos = 0; 1076 1077 // Progress info 1078 coder->progress_in = 0; 1079 coder->progress_out = LZMA_STREAM_HEADER_SIZE; 1080 1081 return LZMA_OK; 1082 } 1083 1084 1085 #ifdef HAVE_SYMBOL_VERSIONS_LINUX 1086 // These are for compatibility with binaries linked against liblzma that 1087 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7. 1088 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2 1089 // but it has been added here anyway since someone might misread the 1090 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist. 1091 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha", 1092 lzma_ret, lzma_stream_encoder_mt_512a)( 1093 lzma_stream *strm, const lzma_mt *options) 1094 lzma_nothrow lzma_attr_warn_unused_result 1095 __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 1096 1097 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2", 1098 lzma_ret, lzma_stream_encoder_mt_522)( 1099 lzma_stream *strm, const lzma_mt *options) 1100 lzma_nothrow lzma_attr_warn_unused_result 1101 __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 1102 1103 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2", 1104 lzma_ret, lzma_stream_encoder_mt_52)( 1105 lzma_stream *strm, const lzma_mt *options) 1106 lzma_nothrow lzma_attr_warn_unused_result; 1107 1108 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52 1109 #endif 1110 extern LZMA_API(lzma_ret) 1111 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 1112 { 1113 lzma_next_strm_init(stream_encoder_mt_init, strm, options); 1114 1115 strm->internal->supported_actions[LZMA_RUN] = true; 1116 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 1117 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 1118 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 1119 strm->internal->supported_actions[LZMA_FINISH] = true; 1120 1121 return LZMA_OK; 1122 } 1123 1124 1125 #ifdef HAVE_SYMBOL_VERSIONS_LINUX 1126 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha", 1127 uint64_t, lzma_stream_encoder_mt_memusage_512a)( 1128 const lzma_mt *options) lzma_nothrow lzma_attr_pure 1129 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 1130 1131 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2", 1132 uint64_t, lzma_stream_encoder_mt_memusage_522)( 1133 const lzma_mt *options) lzma_nothrow lzma_attr_pure 1134 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 1135 1136 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2", 1137 uint64_t, lzma_stream_encoder_mt_memusage_52)( 1138 const lzma_mt *options) lzma_nothrow lzma_attr_pure; 1139 1140 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52 1141 #endif 1142 // This function name is a monster but it's consistent with the older 1143 // monster names. :-( 31 chars is the max that C99 requires so in that 1144 // sense it's not too long. ;-) 1145 extern LZMA_API(uint64_t) 1146 lzma_stream_encoder_mt_memusage(const lzma_mt *options) 1147 { 1148 lzma_options_easy easy; 1149 const lzma_filter *filters; 1150 uint64_t block_size; 1151 uint64_t outbuf_size_max; 1152 1153 if (get_options(options, &easy, &filters, &block_size, 1154 &outbuf_size_max) != LZMA_OK) 1155 return UINT64_MAX; 1156 1157 // Memory usage of the input buffers 1158 const uint64_t inbuf_memusage = options->threads * block_size; 1159 1160 // Memory usage of the filter encoders 1161 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 1162 if (filters_memusage == UINT64_MAX) 1163 return UINT64_MAX; 1164 1165 filters_memusage *= options->threads; 1166 1167 // Memory usage of the output queue 1168 const uint64_t outq_memusage = lzma_outq_memusage( 1169 outbuf_size_max, options->threads); 1170 if (outq_memusage == UINT64_MAX) 1171 return UINT64_MAX; 1172 1173 // Sum them with overflow checking. 1174 uint64_t total_memusage = LZMA_MEMUSAGE_BASE 1175 + sizeof(lzma_stream_coder) 1176 + options->threads * sizeof(worker_thread); 1177 1178 if (UINT64_MAX - total_memusage < inbuf_memusage) 1179 return UINT64_MAX; 1180 1181 total_memusage += inbuf_memusage; 1182 1183 if (UINT64_MAX - total_memusage < filters_memusage) 1184 return UINT64_MAX; 1185 1186 total_memusage += filters_memusage; 1187 1188 if (UINT64_MAX - total_memusage < outq_memusage) 1189 return UINT64_MAX; 1190 1191 return total_memusage + outq_memusage; 1192 } 1193