1 /////////////////////////////////////////////////////////////////////////////// 2 // 3 /// \file stream_encoder_mt.c 4 /// \brief Multithreaded .xz Stream encoder 5 // 6 // Author: Lasse Collin 7 // 8 // This file has been put into the public domain. 9 // You can do whatever you want with this file. 10 // 11 /////////////////////////////////////////////////////////////////////////////// 12 13 #include "filter_encoder.h" 14 #include "easy_preset.h" 15 #include "block_encoder.h" 16 #include "block_buffer_encoder.h" 17 #include "index_encoder.h" 18 #include "outqueue.h" 19 20 21 /// Maximum supported block size. This makes it simpler to prevent integer 22 /// overflows if we are given unusually large block size. 23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 24 25 26 typedef enum { 27 /// Waiting for work. 28 THR_IDLE, 29 30 /// Encoding is in progress. 31 THR_RUN, 32 33 /// Encoding is in progress but no more input data will 34 /// be read. 35 THR_FINISH, 36 37 /// The main thread wants the thread to stop whatever it was doing 38 /// but not exit. 39 THR_STOP, 40 41 /// The main thread wants the thread to exit. We could use 42 /// cancellation but since there's stopped anyway, this is lazier. 43 THR_EXIT, 44 45 } worker_state; 46 47 48 typedef struct worker_thread_s worker_thread; 49 struct worker_thread_s { 50 worker_state state; 51 52 /// Input buffer of coder->block_size bytes. The main thread will 53 /// put new input into this and update in_size accordingly. Once 54 /// no more input is coming, state will be set to THR_FINISH. 55 uint8_t *in; 56 57 /// Amount of data available in the input buffer. This is modified 58 /// only by the main thread. 59 size_t in_size; 60 61 /// Output buffer for this thread. This is set by the main 62 /// thread every time a new Block is started with this thread 63 /// structure. 64 lzma_outbuf *outbuf; 65 66 /// Pointer to the main structure is needed when putting this 67 /// thread back to the stack of free threads. 68 lzma_coder *coder; 69 70 /// The allocator is set by the main thread. Since a copy of the 71 /// pointer is kept here, the application must not change the 72 /// allocator before calling lzma_end(). 73 const lzma_allocator *allocator; 74 75 /// Amount of uncompressed data that has already been compressed. 76 uint64_t progress_in; 77 78 /// Amount of compressed data that is ready. 79 uint64_t progress_out; 80 81 /// Block encoder 82 lzma_next_coder block_encoder; 83 84 /// Compression options for this Block 85 lzma_block block_options; 86 87 /// Next structure in the stack of free worker threads. 88 worker_thread *next; 89 90 mythread_mutex mutex; 91 mythread_cond cond; 92 93 /// The ID of this thread is used to join the thread 94 /// when it's not needed anymore. 95 mythread thread_id; 96 }; 97 98 99 struct lzma_coder_s { 100 enum { 101 SEQ_STREAM_HEADER, 102 SEQ_BLOCK, 103 SEQ_INDEX, 104 SEQ_STREAM_FOOTER, 105 } sequence; 106 107 /// Start a new Block every block_size bytes of input unless 108 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 109 size_t block_size; 110 111 /// The filter chain currently in use 112 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 113 114 115 /// Index to hold sizes of the Blocks 116 lzma_index *index; 117 118 /// Index encoder 119 lzma_next_coder index_encoder; 120 121 122 /// Stream Flags for encoding the Stream Header and Stream Footer. 123 lzma_stream_flags stream_flags; 124 125 /// Buffer to hold Stream Header and Stream Footer. 126 uint8_t header[LZMA_STREAM_HEADER_SIZE]; 127 128 /// Read position in header[] 129 size_t header_pos; 130 131 132 /// Output buffer queue for compressed data 133 lzma_outq outq; 134 135 136 /// Maximum wait time if cannot use all the input and cannot 137 /// fill the output buffer. This is in milliseconds. 138 uint32_t timeout; 139 140 141 /// Error code from a worker thread 142 lzma_ret thread_error; 143 144 /// Array of allocated thread-specific structures 145 worker_thread *threads; 146 147 /// Number of structures in "threads" above. This is also the 148 /// number of threads that will be created at maximum. 149 uint32_t threads_max; 150 151 /// Number of thread structures that have been initialized, and 152 /// thus the number of worker threads actually created so far. 153 uint32_t threads_initialized; 154 155 /// Stack of free threads. When a thread finishes, it puts itself 156 /// back into this stack. This starts as empty because threads 157 /// are created only when actually needed. 158 worker_thread *threads_free; 159 160 /// The most recent worker thread to which the main thread writes 161 /// the new input from the application. 162 worker_thread *thr; 163 164 165 /// Amount of uncompressed data in Blocks that have already 166 /// been finished. 167 uint64_t progress_in; 168 169 /// Amount of compressed data in Stream Header + Blocks that 170 /// have already been finished. 171 uint64_t progress_out; 172 173 174 mythread_mutex mutex; 175 mythread_cond cond; 176 }; 177 178 179 /// Tell the main thread that something has gone wrong. 180 static void 181 worker_error(worker_thread *thr, lzma_ret ret) 182 { 183 assert(ret != LZMA_OK); 184 assert(ret != LZMA_STREAM_END); 185 186 mythread_sync(thr->coder->mutex) { 187 if (thr->coder->thread_error == LZMA_OK) 188 thr->coder->thread_error = ret; 189 190 mythread_cond_signal(&thr->coder->cond); 191 } 192 193 return; 194 } 195 196 197 static worker_state 198 worker_encode(worker_thread *thr, worker_state state) 199 { 200 assert(thr->progress_in == 0); 201 assert(thr->progress_out == 0); 202 203 // Set the Block options. 204 thr->block_options = (lzma_block){ 205 .version = 0, 206 .check = thr->coder->stream_flags.check, 207 .compressed_size = thr->coder->outq.buf_size_max, 208 .uncompressed_size = thr->coder->block_size, 209 210 // TODO: To allow changing the filter chain, the filters 211 // array must be copied to each worker_thread. 212 .filters = thr->coder->filters, 213 }; 214 215 // Calculate maximum size of the Block Header. This amount is 216 // reserved in the beginning of the buffer so that Block Header 217 // along with Compressed Size and Uncompressed Size can be 218 // written there. 219 lzma_ret ret = lzma_block_header_size(&thr->block_options); 220 if (ret != LZMA_OK) { 221 worker_error(thr, ret); 222 return THR_STOP; 223 } 224 225 // Initialize the Block encoder. 226 ret = lzma_block_encoder_init(&thr->block_encoder, 227 thr->allocator, &thr->block_options); 228 if (ret != LZMA_OK) { 229 worker_error(thr, ret); 230 return THR_STOP; 231 } 232 233 size_t in_pos = 0; 234 size_t in_size = 0; 235 236 thr->outbuf->size = thr->block_options.header_size; 237 const size_t out_size = thr->coder->outq.buf_size_max; 238 239 do { 240 mythread_sync(thr->mutex) { 241 // Store in_pos and out_pos into *thr so that 242 // an application may read them via 243 // lzma_get_progress() to get progress information. 244 // 245 // NOTE: These aren't updated when the encoding 246 // finishes. Instead, the final values are taken 247 // later from thr->outbuf. 248 thr->progress_in = in_pos; 249 thr->progress_out = thr->outbuf->size; 250 251 while (in_size == thr->in_size 252 && thr->state == THR_RUN) 253 mythread_cond_wait(&thr->cond, &thr->mutex); 254 255 state = thr->state; 256 in_size = thr->in_size; 257 } 258 259 // Return if we were asked to stop or exit. 260 if (state >= THR_STOP) 261 return state; 262 263 lzma_action action = state == THR_FINISH 264 ? LZMA_FINISH : LZMA_RUN; 265 266 // Limit the amount of input given to the Block encoder 267 // at once. This way this thread can react fairly quickly 268 // if the main thread wants us to stop or exit. 269 static const size_t in_chunk_max = 16384; 270 size_t in_limit = in_size; 271 if (in_size - in_pos > in_chunk_max) { 272 in_limit = in_pos + in_chunk_max; 273 action = LZMA_RUN; 274 } 275 276 ret = thr->block_encoder.code( 277 thr->block_encoder.coder, thr->allocator, 278 thr->in, &in_pos, in_limit, thr->outbuf->buf, 279 &thr->outbuf->size, out_size, action); 280 } while (ret == LZMA_OK && thr->outbuf->size < out_size); 281 282 switch (ret) { 283 case LZMA_STREAM_END: 284 assert(state == THR_FINISH); 285 286 // Encode the Block Header. By doing it after 287 // the compression, we can store the Compressed Size 288 // and Uncompressed Size fields. 289 ret = lzma_block_header_encode(&thr->block_options, 290 thr->outbuf->buf); 291 if (ret != LZMA_OK) { 292 worker_error(thr, ret); 293 return THR_STOP; 294 } 295 296 break; 297 298 case LZMA_OK: 299 // The data was incompressible. Encode it using uncompressed 300 // LZMA2 chunks. 301 // 302 // First wait that we have gotten all the input. 303 mythread_sync(thr->mutex) { 304 while (thr->state == THR_RUN) 305 mythread_cond_wait(&thr->cond, &thr->mutex); 306 307 state = thr->state; 308 in_size = thr->in_size; 309 } 310 311 if (state >= THR_STOP) 312 return state; 313 314 // Do the encoding. This takes care of the Block Header too. 315 thr->outbuf->size = 0; 316 ret = lzma_block_uncomp_encode(&thr->block_options, 317 thr->in, in_size, thr->outbuf->buf, 318 &thr->outbuf->size, out_size); 319 320 // It shouldn't fail. 321 if (ret != LZMA_OK) { 322 worker_error(thr, LZMA_PROG_ERROR); 323 return THR_STOP; 324 } 325 326 break; 327 328 default: 329 worker_error(thr, ret); 330 return THR_STOP; 331 } 332 333 // Set the size information that will be read by the main thread 334 // to write the Index field. 335 thr->outbuf->unpadded_size 336 = lzma_block_unpadded_size(&thr->block_options); 337 assert(thr->outbuf->unpadded_size != 0); 338 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 339 340 return THR_FINISH; 341 } 342 343 344 static MYTHREAD_RET_TYPE 345 worker_start(void *thr_ptr) 346 { 347 worker_thread *thr = thr_ptr; 348 worker_state state = THR_IDLE; // Init to silence a warning 349 350 while (true) { 351 // Wait for work. 352 mythread_sync(thr->mutex) { 353 while (true) { 354 // The thread is already idle so if we are 355 // requested to stop, just set the state. 356 if (thr->state == THR_STOP) { 357 thr->state = THR_IDLE; 358 mythread_cond_signal(&thr->cond); 359 } 360 361 state = thr->state; 362 if (state != THR_IDLE) 363 break; 364 365 mythread_cond_wait(&thr->cond, &thr->mutex); 366 } 367 } 368 369 assert(state != THR_IDLE); 370 assert(state != THR_STOP); 371 372 if (state <= THR_FINISH) 373 state = worker_encode(thr, state); 374 375 if (state == THR_EXIT) 376 break; 377 378 // Mark the thread as idle unless the main thread has 379 // told us to exit. Signal is needed for the case 380 // where the main thread is waiting for the threads to stop. 381 mythread_sync(thr->mutex) { 382 if (thr->state != THR_EXIT) { 383 thr->state = THR_IDLE; 384 mythread_cond_signal(&thr->cond); 385 } 386 } 387 388 mythread_sync(thr->coder->mutex) { 389 // Mark the output buffer as finished if 390 // no errors occurred. 391 thr->outbuf->finished = state == THR_FINISH; 392 393 // Update the main progress info. 394 thr->coder->progress_in 395 += thr->outbuf->uncompressed_size; 396 thr->coder->progress_out += thr->outbuf->size; 397 thr->progress_in = 0; 398 thr->progress_out = 0; 399 400 // Return this thread to the stack of free threads. 401 thr->next = thr->coder->threads_free; 402 thr->coder->threads_free = thr; 403 404 mythread_cond_signal(&thr->coder->cond); 405 } 406 } 407 408 // Exiting, free the resources. 409 mythread_mutex_destroy(&thr->mutex); 410 mythread_cond_destroy(&thr->cond); 411 412 lzma_next_end(&thr->block_encoder, thr->allocator); 413 lzma_free(thr->in, thr->allocator); 414 return MYTHREAD_RET_VALUE; 415 } 416 417 418 /// Make the threads stop but not exit. Optionally wait for them to stop. 419 static void 420 threads_stop(lzma_coder *coder, bool wait_for_threads) 421 { 422 // Tell the threads to stop. 423 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 424 mythread_sync(coder->threads[i].mutex) { 425 coder->threads[i].state = THR_STOP; 426 mythread_cond_signal(&coder->threads[i].cond); 427 } 428 } 429 430 if (!wait_for_threads) 431 return; 432 433 // Wait for the threads to settle in the idle state. 434 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 435 mythread_sync(coder->threads[i].mutex) { 436 while (coder->threads[i].state != THR_IDLE) 437 mythread_cond_wait(&coder->threads[i].cond, 438 &coder->threads[i].mutex); 439 } 440 } 441 442 return; 443 } 444 445 446 /// Stop the threads and free the resources associated with them. 447 /// Wait until the threads have exited. 448 static void 449 threads_end(lzma_coder *coder, const lzma_allocator *allocator) 450 { 451 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 452 mythread_sync(coder->threads[i].mutex) { 453 coder->threads[i].state = THR_EXIT; 454 mythread_cond_signal(&coder->threads[i].cond); 455 } 456 } 457 458 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 459 int ret = mythread_join(coder->threads[i].thread_id); 460 assert(ret == 0); 461 (void)ret; 462 } 463 464 lzma_free(coder->threads, allocator); 465 return; 466 } 467 468 469 /// Initialize a new worker_thread structure and create a new thread. 470 static lzma_ret 471 initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator) 472 { 473 worker_thread *thr = &coder->threads[coder->threads_initialized]; 474 475 thr->in = lzma_alloc(coder->block_size, allocator); 476 if (thr->in == NULL) 477 return LZMA_MEM_ERROR; 478 479 if (mythread_mutex_init(&thr->mutex)) 480 goto error_mutex; 481 482 if (mythread_cond_init(&thr->cond)) 483 goto error_cond; 484 485 thr->state = THR_IDLE; 486 thr->allocator = allocator; 487 thr->coder = coder; 488 thr->progress_in = 0; 489 thr->progress_out = 0; 490 thr->block_encoder = LZMA_NEXT_CODER_INIT; 491 492 if (mythread_create(&thr->thread_id, &worker_start, thr)) 493 goto error_thread; 494 495 ++coder->threads_initialized; 496 coder->thr = thr; 497 498 return LZMA_OK; 499 500 error_thread: 501 mythread_cond_destroy(&thr->cond); 502 503 error_cond: 504 mythread_mutex_destroy(&thr->mutex); 505 506 error_mutex: 507 lzma_free(thr->in, allocator); 508 return LZMA_MEM_ERROR; 509 } 510 511 512 static lzma_ret 513 get_thread(lzma_coder *coder, const lzma_allocator *allocator) 514 { 515 // If there are no free output subqueues, there is no 516 // point to try getting a thread. 517 if (!lzma_outq_has_buf(&coder->outq)) 518 return LZMA_OK; 519 520 // If there is a free structure on the stack, use it. 521 mythread_sync(coder->mutex) { 522 if (coder->threads_free != NULL) { 523 coder->thr = coder->threads_free; 524 coder->threads_free = coder->threads_free->next; 525 } 526 } 527 528 if (coder->thr == NULL) { 529 // If there are no uninitialized structures left, return. 530 if (coder->threads_initialized == coder->threads_max) 531 return LZMA_OK; 532 533 // Initialize a new thread. 534 return_if_error(initialize_new_thread(coder, allocator)); 535 } 536 537 // Reset the parts of the thread state that have to be done 538 // in the main thread. 539 mythread_sync(coder->thr->mutex) { 540 coder->thr->state = THR_RUN; 541 coder->thr->in_size = 0; 542 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); 543 mythread_cond_signal(&coder->thr->cond); 544 } 545 546 return LZMA_OK; 547 } 548 549 550 static lzma_ret 551 stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator, 552 const uint8_t *restrict in, size_t *restrict in_pos, 553 size_t in_size, lzma_action action) 554 { 555 while (*in_pos < in_size 556 || (coder->thr != NULL && action != LZMA_RUN)) { 557 if (coder->thr == NULL) { 558 // Get a new thread. 559 const lzma_ret ret = get_thread(coder, allocator); 560 if (coder->thr == NULL) 561 return ret; 562 } 563 564 // Copy the input data to thread's buffer. 565 size_t thr_in_size = coder->thr->in_size; 566 lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 567 &thr_in_size, coder->block_size); 568 569 // Tell the Block encoder to finish if 570 // - it has got block_size bytes of input; or 571 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 572 // or LZMA_FULL_BARRIER was used. 573 // 574 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 575 const bool finish = thr_in_size == coder->block_size 576 || (*in_pos == in_size && action != LZMA_RUN); 577 578 bool block_error = false; 579 580 mythread_sync(coder->thr->mutex) { 581 if (coder->thr->state == THR_IDLE) { 582 // Something has gone wrong with the Block 583 // encoder. It has set coder->thread_error 584 // which we will read a few lines later. 585 block_error = true; 586 } else { 587 // Tell the Block encoder its new amount 588 // of input and update the state if needed. 589 coder->thr->in_size = thr_in_size; 590 591 if (finish) 592 coder->thr->state = THR_FINISH; 593 594 mythread_cond_signal(&coder->thr->cond); 595 } 596 } 597 598 if (block_error) { 599 lzma_ret ret; 600 601 mythread_sync(coder->mutex) { 602 ret = coder->thread_error; 603 } 604 605 return ret; 606 } 607 608 if (finish) 609 coder->thr = NULL; 610 } 611 612 return LZMA_OK; 613 } 614 615 616 /// Wait until more input can be consumed, more output can be read, or 617 /// an optional timeout is reached. 618 static bool 619 wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs, 620 bool *has_blocked, bool has_input) 621 { 622 if (coder->timeout != 0 && !*has_blocked) { 623 // Every time when stream_encode_mt() is called via 624 // lzma_code(), *has_blocked starts as false. We set it 625 // to true here and calculate the absolute time when 626 // we must return if there's nothing to do. 627 // 628 // The idea of *has_blocked is to avoid unneeded calls 629 // to mythread_condtime_set(), which may do a syscall 630 // depending on the operating system. 631 *has_blocked = true; 632 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 633 } 634 635 bool timed_out = false; 636 637 mythread_sync(coder->mutex) { 638 // There are four things that we wait. If one of them 639 // becomes possible, we return. 640 // - If there is input left, we need to get a free 641 // worker thread and an output buffer for it. 642 // - Data ready to be read from the output queue. 643 // - A worker thread indicates an error. 644 // - Time out occurs. 645 while ((!has_input || coder->threads_free == NULL 646 || !lzma_outq_has_buf(&coder->outq)) 647 && !lzma_outq_is_readable(&coder->outq) 648 && coder->thread_error == LZMA_OK 649 && !timed_out) { 650 if (coder->timeout != 0) 651 timed_out = mythread_cond_timedwait( 652 &coder->cond, &coder->mutex, 653 wait_abs) != 0; 654 else 655 mythread_cond_wait(&coder->cond, 656 &coder->mutex); 657 } 658 } 659 660 return timed_out; 661 } 662 663 664 static lzma_ret 665 stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator, 666 const uint8_t *restrict in, size_t *restrict in_pos, 667 size_t in_size, uint8_t *restrict out, 668 size_t *restrict out_pos, size_t out_size, lzma_action action) 669 { 670 switch (coder->sequence) { 671 case SEQ_STREAM_HEADER: 672 lzma_bufcpy(coder->header, &coder->header_pos, 673 sizeof(coder->header), 674 out, out_pos, out_size); 675 if (coder->header_pos < sizeof(coder->header)) 676 return LZMA_OK; 677 678 coder->header_pos = 0; 679 coder->sequence = SEQ_BLOCK; 680 681 // Fall through 682 683 case SEQ_BLOCK: { 684 // Initialized to silence warnings. 685 lzma_vli unpadded_size = 0; 686 lzma_vli uncompressed_size = 0; 687 lzma_ret ret = LZMA_OK; 688 689 // These are for wait_for_work(). 690 bool has_blocked = false; 691 mythread_condtime wait_abs; 692 693 while (true) { 694 mythread_sync(coder->mutex) { 695 // Check for Block encoder errors. 696 ret = coder->thread_error; 697 if (ret != LZMA_OK) { 698 assert(ret != LZMA_STREAM_END); 699 break; 700 } 701 702 // Try to read compressed data to out[]. 703 ret = lzma_outq_read(&coder->outq, 704 out, out_pos, out_size, 705 &unpadded_size, 706 &uncompressed_size); 707 } 708 709 if (ret == LZMA_STREAM_END) { 710 // End of Block. Add it to the Index. 711 ret = lzma_index_append(coder->index, 712 allocator, unpadded_size, 713 uncompressed_size); 714 715 // If we didn't fill the output buffer yet, 716 // try to read more data. Maybe the next 717 // outbuf has been finished already too. 718 if (*out_pos < out_size) 719 continue; 720 } 721 722 if (ret != LZMA_OK) { 723 // coder->thread_error was set or 724 // lzma_index_append() failed. 725 threads_stop(coder, false); 726 return ret; 727 } 728 729 // Try to give uncompressed data to a worker thread. 730 ret = stream_encode_in(coder, allocator, 731 in, in_pos, in_size, action); 732 if (ret != LZMA_OK) { 733 threads_stop(coder, false); 734 return ret; 735 } 736 737 // See if we should wait or return. 738 // 739 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 740 if (*in_pos == in_size) { 741 // LZMA_RUN: More data is probably coming 742 // so return to let the caller fill the 743 // input buffer. 744 if (action == LZMA_RUN) 745 return LZMA_OK; 746 747 // LZMA_FULL_BARRIER: The same as with 748 // LZMA_RUN but tell the caller that the 749 // barrier was completed. 750 if (action == LZMA_FULL_BARRIER) 751 return LZMA_STREAM_END; 752 753 // Finishing or flushing isn't completed until 754 // all input data has been encoded and copied 755 // to the output buffer. 756 if (lzma_outq_is_empty(&coder->outq)) { 757 // LZMA_FINISH: Continue to encode 758 // the Index field. 759 if (action == LZMA_FINISH) 760 break; 761 762 // LZMA_FULL_FLUSH: Return to tell 763 // the caller that flushing was 764 // completed. 765 if (action == LZMA_FULL_FLUSH) 766 return LZMA_STREAM_END; 767 } 768 } 769 770 // Return if there is no output space left. 771 // This check must be done after testing the input 772 // buffer, because we might want to use a different 773 // return code. 774 if (*out_pos == out_size) 775 return LZMA_OK; 776 777 // Neither in nor out has been used completely. 778 // Wait until there's something we can do. 779 if (wait_for_work(coder, &wait_abs, &has_blocked, 780 *in_pos < in_size)) 781 return LZMA_TIMED_OUT; 782 } 783 784 // All Blocks have been encoded and the threads have stopped. 785 // Prepare to encode the Index field. 786 return_if_error(lzma_index_encoder_init( 787 &coder->index_encoder, allocator, 788 coder->index)); 789 coder->sequence = SEQ_INDEX; 790 791 // Update the progress info to take the Index and 792 // Stream Footer into account. Those are very fast to encode 793 // so in terms of progress information they can be thought 794 // to be ready to be copied out. 795 coder->progress_out += lzma_index_size(coder->index) 796 + LZMA_STREAM_HEADER_SIZE; 797 } 798 799 // Fall through 800 801 case SEQ_INDEX: { 802 // Call the Index encoder. It doesn't take any input, so 803 // those pointers can be NULL. 804 const lzma_ret ret = coder->index_encoder.code( 805 coder->index_encoder.coder, allocator, 806 NULL, NULL, 0, 807 out, out_pos, out_size, LZMA_RUN); 808 if (ret != LZMA_STREAM_END) 809 return ret; 810 811 // Encode the Stream Footer into coder->buffer. 812 coder->stream_flags.backward_size 813 = lzma_index_size(coder->index); 814 if (lzma_stream_footer_encode(&coder->stream_flags, 815 coder->header) != LZMA_OK) 816 return LZMA_PROG_ERROR; 817 818 coder->sequence = SEQ_STREAM_FOOTER; 819 } 820 821 // Fall through 822 823 case SEQ_STREAM_FOOTER: 824 lzma_bufcpy(coder->header, &coder->header_pos, 825 sizeof(coder->header), 826 out, out_pos, out_size); 827 return coder->header_pos < sizeof(coder->header) 828 ? LZMA_OK : LZMA_STREAM_END; 829 } 830 831 assert(0); 832 return LZMA_PROG_ERROR; 833 } 834 835 836 static void 837 stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator) 838 { 839 // Threads must be killed before the output queue can be freed. 840 threads_end(coder, allocator); 841 lzma_outq_end(&coder->outq, allocator); 842 843 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 844 lzma_free(coder->filters[i].options, allocator); 845 846 lzma_next_end(&coder->index_encoder, allocator); 847 lzma_index_end(coder->index, allocator); 848 849 mythread_cond_destroy(&coder->cond); 850 mythread_mutex_destroy(&coder->mutex); 851 852 lzma_free(coder, allocator); 853 return; 854 } 855 856 857 /// Options handling for lzma_stream_encoder_mt_init() and 858 /// lzma_stream_encoder_mt_memusage() 859 static lzma_ret 860 get_options(const lzma_mt *options, lzma_options_easy *opt_easy, 861 const lzma_filter **filters, uint64_t *block_size, 862 uint64_t *outbuf_size_max) 863 { 864 // Validate some of the options. 865 if (options == NULL) 866 return LZMA_PROG_ERROR; 867 868 if (options->flags != 0 || options->threads == 0 869 || options->threads > LZMA_THREADS_MAX) 870 return LZMA_OPTIONS_ERROR; 871 872 if (options->filters != NULL) { 873 // Filter chain was given, use it as is. 874 *filters = options->filters; 875 } else { 876 // Use a preset. 877 if (lzma_easy_preset(opt_easy, options->preset)) 878 return LZMA_OPTIONS_ERROR; 879 880 *filters = opt_easy->filters; 881 } 882 883 // Block size 884 if (options->block_size > 0) { 885 if (options->block_size > BLOCK_SIZE_MAX) 886 return LZMA_OPTIONS_ERROR; 887 888 *block_size = options->block_size; 889 } else { 890 // Determine the Block size from the filter chain. 891 *block_size = lzma_mt_block_size(*filters); 892 if (*block_size == 0) 893 return LZMA_OPTIONS_ERROR; 894 895 assert(*block_size <= BLOCK_SIZE_MAX); 896 } 897 898 // Calculate the maximum amount output that a single output buffer 899 // may need to hold. This is the same as the maximum total size of 900 // a Block. 901 *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 902 if (*outbuf_size_max == 0) 903 return LZMA_MEM_ERROR; 904 905 return LZMA_OK; 906 } 907 908 909 static void 910 get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out) 911 { 912 // Lock coder->mutex to prevent finishing threads from moving their 913 // progress info from the worker_thread structure to lzma_coder. 914 mythread_sync(coder->mutex) { 915 *progress_in = coder->progress_in; 916 *progress_out = coder->progress_out; 917 918 for (size_t i = 0; i < coder->threads_initialized; ++i) { 919 mythread_sync(coder->threads[i].mutex) { 920 *progress_in += coder->threads[i].progress_in; 921 *progress_out += coder->threads[i] 922 .progress_out; 923 } 924 } 925 } 926 927 return; 928 } 929 930 931 static lzma_ret 932 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 933 const lzma_mt *options) 934 { 935 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 936 937 // Get the filter chain. 938 lzma_options_easy easy; 939 const lzma_filter *filters; 940 uint64_t block_size; 941 uint64_t outbuf_size_max; 942 return_if_error(get_options(options, &easy, &filters, 943 &block_size, &outbuf_size_max)); 944 945 #if SIZE_MAX < UINT64_MAX 946 if (block_size > SIZE_MAX) 947 return LZMA_MEM_ERROR; 948 #endif 949 950 // Validate the filter chain so that we can give an error in this 951 // function instead of delaying it to the first call to lzma_code(). 952 // The memory usage calculation verifies the filter chain as 953 // a side effect so we take advatange of that. 954 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 955 return LZMA_OPTIONS_ERROR; 956 957 // Validate the Check ID. 958 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 959 return LZMA_PROG_ERROR; 960 961 if (!lzma_check_is_supported(options->check)) 962 return LZMA_UNSUPPORTED_CHECK; 963 964 // Allocate and initialize the base structure if needed. 965 if (next->coder == NULL) { 966 next->coder = lzma_alloc(sizeof(lzma_coder), allocator); 967 if (next->coder == NULL) 968 return LZMA_MEM_ERROR; 969 970 // For the mutex and condition variable initializations 971 // the error handling has to be done here because 972 // stream_encoder_mt_end() doesn't know if they have 973 // already been initialized or not. 974 if (mythread_mutex_init(&next->coder->mutex)) { 975 lzma_free(next->coder, allocator); 976 next->coder = NULL; 977 return LZMA_MEM_ERROR; 978 } 979 980 if (mythread_cond_init(&next->coder->cond)) { 981 mythread_mutex_destroy(&next->coder->mutex); 982 lzma_free(next->coder, allocator); 983 next->coder = NULL; 984 return LZMA_MEM_ERROR; 985 } 986 987 next->code = &stream_encode_mt; 988 next->end = &stream_encoder_mt_end; 989 next->get_progress = &get_progress; 990 // next->update = &stream_encoder_mt_update; 991 992 next->coder->filters[0].id = LZMA_VLI_UNKNOWN; 993 next->coder->index_encoder = LZMA_NEXT_CODER_INIT; 994 next->coder->index = NULL; 995 memzero(&next->coder->outq, sizeof(next->coder->outq)); 996 next->coder->threads = NULL; 997 next->coder->threads_max = 0; 998 next->coder->threads_initialized = 0; 999 } 1000 1001 // Basic initializations 1002 next->coder->sequence = SEQ_STREAM_HEADER; 1003 next->coder->block_size = (size_t)(block_size); 1004 next->coder->thread_error = LZMA_OK; 1005 next->coder->thr = NULL; 1006 1007 // Allocate the thread-specific base structures. 1008 assert(options->threads > 0); 1009 if (next->coder->threads_max != options->threads) { 1010 threads_end(next->coder, allocator); 1011 1012 next->coder->threads = NULL; 1013 next->coder->threads_max = 0; 1014 1015 next->coder->threads_initialized = 0; 1016 next->coder->threads_free = NULL; 1017 1018 next->coder->threads = lzma_alloc( 1019 options->threads * sizeof(worker_thread), 1020 allocator); 1021 if (next->coder->threads == NULL) 1022 return LZMA_MEM_ERROR; 1023 1024 next->coder->threads_max = options->threads; 1025 } else { 1026 // Reuse the old structures and threads. Tell the running 1027 // threads to stop and wait until they have stopped. 1028 threads_stop(next->coder, true); 1029 } 1030 1031 // Output queue 1032 return_if_error(lzma_outq_init(&next->coder->outq, allocator, 1033 outbuf_size_max, options->threads)); 1034 1035 // Timeout 1036 next->coder->timeout = options->timeout; 1037 1038 // Free the old filter chain and copy the new one. 1039 for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 1040 lzma_free(next->coder->filters[i].options, allocator); 1041 1042 return_if_error(lzma_filters_copy( 1043 filters, next->coder->filters, allocator)); 1044 1045 // Index 1046 lzma_index_end(next->coder->index, allocator); 1047 next->coder->index = lzma_index_init(allocator); 1048 if (next->coder->index == NULL) 1049 return LZMA_MEM_ERROR; 1050 1051 // Stream Header 1052 next->coder->stream_flags.version = 0; 1053 next->coder->stream_flags.check = options->check; 1054 return_if_error(lzma_stream_header_encode( 1055 &next->coder->stream_flags, next->coder->header)); 1056 1057 next->coder->header_pos = 0; 1058 1059 // Progress info 1060 next->coder->progress_in = 0; 1061 next->coder->progress_out = LZMA_STREAM_HEADER_SIZE; 1062 1063 return LZMA_OK; 1064 } 1065 1066 1067 extern LZMA_API(lzma_ret) 1068 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 1069 { 1070 lzma_next_strm_init(stream_encoder_mt_init, strm, options); 1071 1072 strm->internal->supported_actions[LZMA_RUN] = true; 1073 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 1074 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 1075 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 1076 strm->internal->supported_actions[LZMA_FINISH] = true; 1077 1078 return LZMA_OK; 1079 } 1080 1081 1082 // This function name is a monster but it's consistent with the older 1083 // monster names. :-( 31 chars is the max that C99 requires so in that 1084 // sense it's not too long. ;-) 1085 extern LZMA_API(uint64_t) 1086 lzma_stream_encoder_mt_memusage(const lzma_mt *options) 1087 { 1088 lzma_options_easy easy; 1089 const lzma_filter *filters; 1090 uint64_t block_size; 1091 uint64_t outbuf_size_max; 1092 1093 if (get_options(options, &easy, &filters, &block_size, 1094 &outbuf_size_max) != LZMA_OK) 1095 return UINT64_MAX; 1096 1097 // Memory usage of the input buffers 1098 const uint64_t inbuf_memusage = options->threads * block_size; 1099 1100 // Memory usage of the filter encoders 1101 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 1102 if (filters_memusage == UINT64_MAX) 1103 return UINT64_MAX; 1104 1105 filters_memusage *= options->threads; 1106 1107 // Memory usage of the output queue 1108 const uint64_t outq_memusage = lzma_outq_memusage( 1109 outbuf_size_max, options->threads); 1110 if (outq_memusage == UINT64_MAX) 1111 return UINT64_MAX; 1112 1113 // Sum them with overflow checking. 1114 uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder) 1115 + options->threads * sizeof(worker_thread); 1116 1117 if (UINT64_MAX - total_memusage < inbuf_memusage) 1118 return UINT64_MAX; 1119 1120 total_memusage += inbuf_memusage; 1121 1122 if (UINT64_MAX - total_memusage < filters_memusage) 1123 return UINT64_MAX; 1124 1125 total_memusage += filters_memusage; 1126 1127 if (UINT64_MAX - total_memusage < outq_memusage) 1128 return UINT64_MAX; 1129 1130 return total_memusage + outq_memusage; 1131 } 1132