1 /////////////////////////////////////////////////////////////////////////////// 2 // 3 /// \file stream_encoder_mt.c 4 /// \brief Multithreaded .xz Stream encoder 5 // 6 // Author: Lasse Collin 7 // 8 // This file has been put into the public domain. 9 // You can do whatever you want with this file. 10 // 11 /////////////////////////////////////////////////////////////////////////////// 12 13 #include "filter_encoder.h" 14 #include "easy_preset.h" 15 #include "block_encoder.h" 16 #include "block_buffer_encoder.h" 17 #include "index_encoder.h" 18 #include "outqueue.h" 19 20 21 /// Maximum supported block size. This makes it simpler to prevent integer 22 /// overflows if we are given unusually large block size. 23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 24 25 26 typedef enum { 27 /// Waiting for work. 28 THR_IDLE, 29 30 /// Encoding is in progress. 31 THR_RUN, 32 33 /// Encoding is in progress but no more input data will 34 /// be read. 35 THR_FINISH, 36 37 /// The main thread wants the thread to stop whatever it was doing 38 /// but not exit. 39 THR_STOP, 40 41 /// The main thread wants the thread to exit. We could use 42 /// cancellation but since there's stopped anyway, this is lazier. 43 THR_EXIT, 44 45 } worker_state; 46 47 typedef struct lzma_stream_coder_s lzma_stream_coder; 48 49 typedef struct worker_thread_s worker_thread; 50 struct worker_thread_s { 51 worker_state state; 52 53 /// Input buffer of coder->block_size bytes. The main thread will 54 /// put new input into this and update in_size accordingly. Once 55 /// no more input is coming, state will be set to THR_FINISH. 56 uint8_t *in; 57 58 /// Amount of data available in the input buffer. This is modified 59 /// only by the main thread. 60 size_t in_size; 61 62 /// Output buffer for this thread. This is set by the main 63 /// thread every time a new Block is started with this thread 64 /// structure. 65 lzma_outbuf *outbuf; 66 67 /// Pointer to the main structure is needed when putting this 68 /// thread back to the stack of free threads. 69 lzma_stream_coder *coder; 70 71 /// The allocator is set by the main thread. Since a copy of the 72 /// pointer is kept here, the application must not change the 73 /// allocator before calling lzma_end(). 74 const lzma_allocator *allocator; 75 76 /// Amount of uncompressed data that has already been compressed. 77 uint64_t progress_in; 78 79 /// Amount of compressed data that is ready. 80 uint64_t progress_out; 81 82 /// Block encoder 83 lzma_next_coder block_encoder; 84 85 /// Compression options for this Block 86 lzma_block block_options; 87 88 /// Next structure in the stack of free worker threads. 89 worker_thread *next; 90 91 mythread_mutex mutex; 92 mythread_cond cond; 93 94 /// The ID of this thread is used to join the thread 95 /// when it's not needed anymore. 96 mythread thread_id; 97 }; 98 99 100 struct lzma_stream_coder_s { 101 enum { 102 SEQ_STREAM_HEADER, 103 SEQ_BLOCK, 104 SEQ_INDEX, 105 SEQ_STREAM_FOOTER, 106 } sequence; 107 108 /// Start a new Block every block_size bytes of input unless 109 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 110 size_t block_size; 111 112 /// The filter chain currently in use 113 lzma_filter filters[LZMA_FILTERS_MAX + 1]; 114 115 116 /// Index to hold sizes of the Blocks 117 lzma_index *index; 118 119 /// Index encoder 120 lzma_next_coder index_encoder; 121 122 123 /// Stream Flags for encoding the Stream Header and Stream Footer. 124 lzma_stream_flags stream_flags; 125 126 /// Buffer to hold Stream Header and Stream Footer. 127 uint8_t header[LZMA_STREAM_HEADER_SIZE]; 128 129 /// Read position in header[] 130 size_t header_pos; 131 132 133 /// Output buffer queue for compressed data 134 lzma_outq outq; 135 136 137 /// Maximum wait time if cannot use all the input and cannot 138 /// fill the output buffer. This is in milliseconds. 139 uint32_t timeout; 140 141 142 /// Error code from a worker thread 143 lzma_ret thread_error; 144 145 /// Array of allocated thread-specific structures 146 worker_thread *threads; 147 148 /// Number of structures in "threads" above. This is also the 149 /// number of threads that will be created at maximum. 150 uint32_t threads_max; 151 152 /// Number of thread structures that have been initialized, and 153 /// thus the number of worker threads actually created so far. 154 uint32_t threads_initialized; 155 156 /// Stack of free threads. When a thread finishes, it puts itself 157 /// back into this stack. This starts as empty because threads 158 /// are created only when actually needed. 159 worker_thread *threads_free; 160 161 /// The most recent worker thread to which the main thread writes 162 /// the new input from the application. 163 worker_thread *thr; 164 165 166 /// Amount of uncompressed data in Blocks that have already 167 /// been finished. 168 uint64_t progress_in; 169 170 /// Amount of compressed data in Stream Header + Blocks that 171 /// have already been finished. 172 uint64_t progress_out; 173 174 175 mythread_mutex mutex; 176 mythread_cond cond; 177 }; 178 179 180 /// Tell the main thread that something has gone wrong. 181 static void 182 worker_error(worker_thread *thr, lzma_ret ret) 183 { 184 assert(ret != LZMA_OK); 185 assert(ret != LZMA_STREAM_END); 186 187 mythread_sync(thr->coder->mutex) { 188 if (thr->coder->thread_error == LZMA_OK) 189 thr->coder->thread_error = ret; 190 191 mythread_cond_signal(&thr->coder->cond); 192 } 193 194 return; 195 } 196 197 198 static worker_state 199 worker_encode(worker_thread *thr, worker_state state) 200 { 201 assert(thr->progress_in == 0); 202 assert(thr->progress_out == 0); 203 204 // Set the Block options. 205 thr->block_options = (lzma_block){ 206 .version = 0, 207 .check = thr->coder->stream_flags.check, 208 .compressed_size = thr->coder->outq.buf_size_max, 209 .uncompressed_size = thr->coder->block_size, 210 211 // TODO: To allow changing the filter chain, the filters 212 // array must be copied to each worker_thread. 213 .filters = thr->coder->filters, 214 }; 215 216 // Calculate maximum size of the Block Header. This amount is 217 // reserved in the beginning of the buffer so that Block Header 218 // along with Compressed Size and Uncompressed Size can be 219 // written there. 220 lzma_ret ret = lzma_block_header_size(&thr->block_options); 221 if (ret != LZMA_OK) { 222 worker_error(thr, ret); 223 return THR_STOP; 224 } 225 226 // Initialize the Block encoder. 227 ret = lzma_block_encoder_init(&thr->block_encoder, 228 thr->allocator, &thr->block_options); 229 if (ret != LZMA_OK) { 230 worker_error(thr, ret); 231 return THR_STOP; 232 } 233 234 size_t in_pos = 0; 235 size_t in_size = 0; 236 237 thr->outbuf->size = thr->block_options.header_size; 238 const size_t out_size = thr->coder->outq.buf_size_max; 239 240 do { 241 mythread_sync(thr->mutex) { 242 // Store in_pos and out_pos into *thr so that 243 // an application may read them via 244 // lzma_get_progress() to get progress information. 245 // 246 // NOTE: These aren't updated when the encoding 247 // finishes. Instead, the final values are taken 248 // later from thr->outbuf. 249 thr->progress_in = in_pos; 250 thr->progress_out = thr->outbuf->size; 251 252 while (in_size == thr->in_size 253 && thr->state == THR_RUN) 254 mythread_cond_wait(&thr->cond, &thr->mutex); 255 256 state = thr->state; 257 in_size = thr->in_size; 258 } 259 260 // Return if we were asked to stop or exit. 261 if (state >= THR_STOP) 262 return state; 263 264 lzma_action action = state == THR_FINISH 265 ? LZMA_FINISH : LZMA_RUN; 266 267 // Limit the amount of input given to the Block encoder 268 // at once. This way this thread can react fairly quickly 269 // if the main thread wants us to stop or exit. 270 static const size_t in_chunk_max = 16384; 271 size_t in_limit = in_size; 272 if (in_size - in_pos > in_chunk_max) { 273 in_limit = in_pos + in_chunk_max; 274 action = LZMA_RUN; 275 } 276 277 ret = thr->block_encoder.code( 278 thr->block_encoder.coder, thr->allocator, 279 thr->in, &in_pos, in_limit, thr->outbuf->buf, 280 &thr->outbuf->size, out_size, action); 281 } while (ret == LZMA_OK && thr->outbuf->size < out_size); 282 283 switch (ret) { 284 case LZMA_STREAM_END: 285 assert(state == THR_FINISH); 286 287 // Encode the Block Header. By doing it after 288 // the compression, we can store the Compressed Size 289 // and Uncompressed Size fields. 290 ret = lzma_block_header_encode(&thr->block_options, 291 thr->outbuf->buf); 292 if (ret != LZMA_OK) { 293 worker_error(thr, ret); 294 return THR_STOP; 295 } 296 297 break; 298 299 case LZMA_OK: 300 // The data was incompressible. Encode it using uncompressed 301 // LZMA2 chunks. 302 // 303 // First wait that we have gotten all the input. 304 mythread_sync(thr->mutex) { 305 while (thr->state == THR_RUN) 306 mythread_cond_wait(&thr->cond, &thr->mutex); 307 308 state = thr->state; 309 in_size = thr->in_size; 310 } 311 312 if (state >= THR_STOP) 313 return state; 314 315 // Do the encoding. This takes care of the Block Header too. 316 thr->outbuf->size = 0; 317 ret = lzma_block_uncomp_encode(&thr->block_options, 318 thr->in, in_size, thr->outbuf->buf, 319 &thr->outbuf->size, out_size); 320 321 // It shouldn't fail. 322 if (ret != LZMA_OK) { 323 worker_error(thr, LZMA_PROG_ERROR); 324 return THR_STOP; 325 } 326 327 break; 328 329 default: 330 worker_error(thr, ret); 331 return THR_STOP; 332 } 333 334 // Set the size information that will be read by the main thread 335 // to write the Index field. 336 thr->outbuf->unpadded_size 337 = lzma_block_unpadded_size(&thr->block_options); 338 assert(thr->outbuf->unpadded_size != 0); 339 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 340 341 return THR_FINISH; 342 } 343 344 345 static MYTHREAD_RET_TYPE 346 worker_start(void *thr_ptr) 347 { 348 worker_thread *thr = thr_ptr; 349 worker_state state = THR_IDLE; // Init to silence a warning 350 351 while (true) { 352 // Wait for work. 353 mythread_sync(thr->mutex) { 354 while (true) { 355 // The thread is already idle so if we are 356 // requested to stop, just set the state. 357 if (thr->state == THR_STOP) { 358 thr->state = THR_IDLE; 359 mythread_cond_signal(&thr->cond); 360 } 361 362 state = thr->state; 363 if (state != THR_IDLE) 364 break; 365 366 mythread_cond_wait(&thr->cond, &thr->mutex); 367 } 368 } 369 370 assert(state != THR_IDLE); 371 assert(state != THR_STOP); 372 373 if (state <= THR_FINISH) 374 state = worker_encode(thr, state); 375 376 if (state == THR_EXIT) 377 break; 378 379 // Mark the thread as idle unless the main thread has 380 // told us to exit. Signal is needed for the case 381 // where the main thread is waiting for the threads to stop. 382 mythread_sync(thr->mutex) { 383 if (thr->state != THR_EXIT) { 384 thr->state = THR_IDLE; 385 mythread_cond_signal(&thr->cond); 386 } 387 } 388 389 mythread_sync(thr->coder->mutex) { 390 // Mark the output buffer as finished if 391 // no errors occurred. 392 thr->outbuf->finished = state == THR_FINISH; 393 394 // Update the main progress info. 395 thr->coder->progress_in 396 += thr->outbuf->uncompressed_size; 397 thr->coder->progress_out += thr->outbuf->size; 398 thr->progress_in = 0; 399 thr->progress_out = 0; 400 401 // Return this thread to the stack of free threads. 402 thr->next = thr->coder->threads_free; 403 thr->coder->threads_free = thr; 404 405 mythread_cond_signal(&thr->coder->cond); 406 } 407 } 408 409 // Exiting, free the resources. 410 mythread_mutex_destroy(&thr->mutex); 411 mythread_cond_destroy(&thr->cond); 412 413 lzma_next_end(&thr->block_encoder, thr->allocator); 414 lzma_free(thr->in, thr->allocator); 415 return MYTHREAD_RET_VALUE; 416 } 417 418 419 /// Make the threads stop but not exit. Optionally wait for them to stop. 420 static void 421 threads_stop(lzma_stream_coder *coder, bool wait_for_threads) 422 { 423 // Tell the threads to stop. 424 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 425 mythread_sync(coder->threads[i].mutex) { 426 coder->threads[i].state = THR_STOP; 427 mythread_cond_signal(&coder->threads[i].cond); 428 } 429 } 430 431 if (!wait_for_threads) 432 return; 433 434 // Wait for the threads to settle in the idle state. 435 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 436 mythread_sync(coder->threads[i].mutex) { 437 while (coder->threads[i].state != THR_IDLE) 438 mythread_cond_wait(&coder->threads[i].cond, 439 &coder->threads[i].mutex); 440 } 441 } 442 443 return; 444 } 445 446 447 /// Stop the threads and free the resources associated with them. 448 /// Wait until the threads have exited. 449 static void 450 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator) 451 { 452 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 453 mythread_sync(coder->threads[i].mutex) { 454 coder->threads[i].state = THR_EXIT; 455 mythread_cond_signal(&coder->threads[i].cond); 456 } 457 } 458 459 for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 460 int ret = mythread_join(coder->threads[i].thread_id); 461 assert(ret == 0); 462 (void)ret; 463 } 464 465 lzma_free(coder->threads, allocator); 466 return; 467 } 468 469 470 /// Initialize a new worker_thread structure and create a new thread. 471 static lzma_ret 472 initialize_new_thread(lzma_stream_coder *coder, 473 const lzma_allocator *allocator) 474 { 475 worker_thread *thr = &coder->threads[coder->threads_initialized]; 476 477 thr->in = lzma_alloc(coder->block_size, allocator); 478 if (thr->in == NULL) 479 return LZMA_MEM_ERROR; 480 481 if (mythread_mutex_init(&thr->mutex)) 482 goto error_mutex; 483 484 if (mythread_cond_init(&thr->cond)) 485 goto error_cond; 486 487 thr->state = THR_IDLE; 488 thr->allocator = allocator; 489 thr->coder = coder; 490 thr->progress_in = 0; 491 thr->progress_out = 0; 492 thr->block_encoder = LZMA_NEXT_CODER_INIT; 493 494 if (mythread_create(&thr->thread_id, &worker_start, thr)) 495 goto error_thread; 496 497 ++coder->threads_initialized; 498 coder->thr = thr; 499 500 return LZMA_OK; 501 502 error_thread: 503 mythread_cond_destroy(&thr->cond); 504 505 error_cond: 506 mythread_mutex_destroy(&thr->mutex); 507 508 error_mutex: 509 lzma_free(thr->in, allocator); 510 return LZMA_MEM_ERROR; 511 } 512 513 514 static lzma_ret 515 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) 516 { 517 // If there are no free output subqueues, there is no 518 // point to try getting a thread. 519 if (!lzma_outq_has_buf(&coder->outq)) 520 return LZMA_OK; 521 522 // If there is a free structure on the stack, use it. 523 mythread_sync(coder->mutex) { 524 if (coder->threads_free != NULL) { 525 coder->thr = coder->threads_free; 526 coder->threads_free = coder->threads_free->next; 527 } 528 } 529 530 if (coder->thr == NULL) { 531 // If there are no uninitialized structures left, return. 532 if (coder->threads_initialized == coder->threads_max) 533 return LZMA_OK; 534 535 // Initialize a new thread. 536 return_if_error(initialize_new_thread(coder, allocator)); 537 } 538 539 // Reset the parts of the thread state that have to be done 540 // in the main thread. 541 mythread_sync(coder->thr->mutex) { 542 coder->thr->state = THR_RUN; 543 coder->thr->in_size = 0; 544 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); 545 mythread_cond_signal(&coder->thr->cond); 546 } 547 548 return LZMA_OK; 549 } 550 551 552 static lzma_ret 553 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, 554 const uint8_t *restrict in, size_t *restrict in_pos, 555 size_t in_size, lzma_action action) 556 { 557 while (*in_pos < in_size 558 || (coder->thr != NULL && action != LZMA_RUN)) { 559 if (coder->thr == NULL) { 560 // Get a new thread. 561 const lzma_ret ret = get_thread(coder, allocator); 562 if (coder->thr == NULL) 563 return ret; 564 } 565 566 // Copy the input data to thread's buffer. 567 size_t thr_in_size = coder->thr->in_size; 568 lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 569 &thr_in_size, coder->block_size); 570 571 // Tell the Block encoder to finish if 572 // - it has got block_size bytes of input; or 573 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 574 // or LZMA_FULL_BARRIER was used. 575 // 576 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 577 const bool finish = thr_in_size == coder->block_size 578 || (*in_pos == in_size && action != LZMA_RUN); 579 580 bool block_error = false; 581 582 mythread_sync(coder->thr->mutex) { 583 if (coder->thr->state == THR_IDLE) { 584 // Something has gone wrong with the Block 585 // encoder. It has set coder->thread_error 586 // which we will read a few lines later. 587 block_error = true; 588 } else { 589 // Tell the Block encoder its new amount 590 // of input and update the state if needed. 591 coder->thr->in_size = thr_in_size; 592 593 if (finish) 594 coder->thr->state = THR_FINISH; 595 596 mythread_cond_signal(&coder->thr->cond); 597 } 598 } 599 600 if (block_error) { 601 lzma_ret ret; 602 603 mythread_sync(coder->mutex) { 604 ret = coder->thread_error; 605 } 606 607 return ret; 608 } 609 610 if (finish) 611 coder->thr = NULL; 612 } 613 614 return LZMA_OK; 615 } 616 617 618 /// Wait until more input can be consumed, more output can be read, or 619 /// an optional timeout is reached. 620 static bool 621 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, 622 bool *has_blocked, bool has_input) 623 { 624 if (coder->timeout != 0 && !*has_blocked) { 625 // Every time when stream_encode_mt() is called via 626 // lzma_code(), *has_blocked starts as false. We set it 627 // to true here and calculate the absolute time when 628 // we must return if there's nothing to do. 629 // 630 // The idea of *has_blocked is to avoid unneeded calls 631 // to mythread_condtime_set(), which may do a syscall 632 // depending on the operating system. 633 *has_blocked = true; 634 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 635 } 636 637 bool timed_out = false; 638 639 mythread_sync(coder->mutex) { 640 // There are four things that we wait. If one of them 641 // becomes possible, we return. 642 // - If there is input left, we need to get a free 643 // worker thread and an output buffer for it. 644 // - Data ready to be read from the output queue. 645 // - A worker thread indicates an error. 646 // - Time out occurs. 647 while ((!has_input || coder->threads_free == NULL 648 || !lzma_outq_has_buf(&coder->outq)) 649 && !lzma_outq_is_readable(&coder->outq) 650 && coder->thread_error == LZMA_OK 651 && !timed_out) { 652 if (coder->timeout != 0) 653 timed_out = mythread_cond_timedwait( 654 &coder->cond, &coder->mutex, 655 wait_abs) != 0; 656 else 657 mythread_cond_wait(&coder->cond, 658 &coder->mutex); 659 } 660 } 661 662 return timed_out; 663 } 664 665 666 static lzma_ret 667 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, 668 const uint8_t *restrict in, size_t *restrict in_pos, 669 size_t in_size, uint8_t *restrict out, 670 size_t *restrict out_pos, size_t out_size, lzma_action action) 671 { 672 lzma_stream_coder *coder = coder_ptr; 673 674 switch (coder->sequence) { 675 case SEQ_STREAM_HEADER: 676 lzma_bufcpy(coder->header, &coder->header_pos, 677 sizeof(coder->header), 678 out, out_pos, out_size); 679 if (coder->header_pos < sizeof(coder->header)) 680 return LZMA_OK; 681 682 coder->header_pos = 0; 683 coder->sequence = SEQ_BLOCK; 684 685 // Fall through 686 687 case SEQ_BLOCK: { 688 // Initialized to silence warnings. 689 lzma_vli unpadded_size = 0; 690 lzma_vli uncompressed_size = 0; 691 lzma_ret ret = LZMA_OK; 692 693 // These are for wait_for_work(). 694 bool has_blocked = false; 695 mythread_condtime wait_abs; 696 697 while (true) { 698 mythread_sync(coder->mutex) { 699 // Check for Block encoder errors. 700 ret = coder->thread_error; 701 if (ret != LZMA_OK) { 702 assert(ret != LZMA_STREAM_END); 703 break; // Break out of mythread_sync. 704 } 705 706 // Try to read compressed data to out[]. 707 ret = lzma_outq_read(&coder->outq, 708 out, out_pos, out_size, 709 &unpadded_size, 710 &uncompressed_size); 711 } 712 713 if (ret == LZMA_STREAM_END) { 714 // End of Block. Add it to the Index. 715 ret = lzma_index_append(coder->index, 716 allocator, unpadded_size, 717 uncompressed_size); 718 719 // If we didn't fill the output buffer yet, 720 // try to read more data. Maybe the next 721 // outbuf has been finished already too. 722 if (*out_pos < out_size) 723 continue; 724 } 725 726 if (ret != LZMA_OK) { 727 // coder->thread_error was set or 728 // lzma_index_append() failed. 729 threads_stop(coder, false); 730 return ret; 731 } 732 733 // Try to give uncompressed data to a worker thread. 734 ret = stream_encode_in(coder, allocator, 735 in, in_pos, in_size, action); 736 if (ret != LZMA_OK) { 737 threads_stop(coder, false); 738 return ret; 739 } 740 741 // See if we should wait or return. 742 // 743 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 744 if (*in_pos == in_size) { 745 // LZMA_RUN: More data is probably coming 746 // so return to let the caller fill the 747 // input buffer. 748 if (action == LZMA_RUN) 749 return LZMA_OK; 750 751 // LZMA_FULL_BARRIER: The same as with 752 // LZMA_RUN but tell the caller that the 753 // barrier was completed. 754 if (action == LZMA_FULL_BARRIER) 755 return LZMA_STREAM_END; 756 757 // Finishing or flushing isn't completed until 758 // all input data has been encoded and copied 759 // to the output buffer. 760 if (lzma_outq_is_empty(&coder->outq)) { 761 // LZMA_FINISH: Continue to encode 762 // the Index field. 763 if (action == LZMA_FINISH) 764 break; 765 766 // LZMA_FULL_FLUSH: Return to tell 767 // the caller that flushing was 768 // completed. 769 if (action == LZMA_FULL_FLUSH) 770 return LZMA_STREAM_END; 771 } 772 } 773 774 // Return if there is no output space left. 775 // This check must be done after testing the input 776 // buffer, because we might want to use a different 777 // return code. 778 if (*out_pos == out_size) 779 return LZMA_OK; 780 781 // Neither in nor out has been used completely. 782 // Wait until there's something we can do. 783 if (wait_for_work(coder, &wait_abs, &has_blocked, 784 *in_pos < in_size)) 785 return LZMA_TIMED_OUT; 786 } 787 788 // All Blocks have been encoded and the threads have stopped. 789 // Prepare to encode the Index field. 790 return_if_error(lzma_index_encoder_init( 791 &coder->index_encoder, allocator, 792 coder->index)); 793 coder->sequence = SEQ_INDEX; 794 795 // Update the progress info to take the Index and 796 // Stream Footer into account. Those are very fast to encode 797 // so in terms of progress information they can be thought 798 // to be ready to be copied out. 799 coder->progress_out += lzma_index_size(coder->index) 800 + LZMA_STREAM_HEADER_SIZE; 801 } 802 803 // Fall through 804 805 case SEQ_INDEX: { 806 // Call the Index encoder. It doesn't take any input, so 807 // those pointers can be NULL. 808 const lzma_ret ret = coder->index_encoder.code( 809 coder->index_encoder.coder, allocator, 810 NULL, NULL, 0, 811 out, out_pos, out_size, LZMA_RUN); 812 if (ret != LZMA_STREAM_END) 813 return ret; 814 815 // Encode the Stream Footer into coder->buffer. 816 coder->stream_flags.backward_size 817 = lzma_index_size(coder->index); 818 if (lzma_stream_footer_encode(&coder->stream_flags, 819 coder->header) != LZMA_OK) 820 return LZMA_PROG_ERROR; 821 822 coder->sequence = SEQ_STREAM_FOOTER; 823 } 824 825 // Fall through 826 827 case SEQ_STREAM_FOOTER: 828 lzma_bufcpy(coder->header, &coder->header_pos, 829 sizeof(coder->header), 830 out, out_pos, out_size); 831 return coder->header_pos < sizeof(coder->header) 832 ? LZMA_OK : LZMA_STREAM_END; 833 } 834 835 assert(0); 836 return LZMA_PROG_ERROR; 837 } 838 839 840 static void 841 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) 842 { 843 lzma_stream_coder *coder = coder_ptr; 844 845 // Threads must be killed before the output queue can be freed. 846 threads_end(coder, allocator); 847 lzma_outq_end(&coder->outq, allocator); 848 849 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 850 lzma_free(coder->filters[i].options, allocator); 851 852 lzma_next_end(&coder->index_encoder, allocator); 853 lzma_index_end(coder->index, allocator); 854 855 mythread_cond_destroy(&coder->cond); 856 mythread_mutex_destroy(&coder->mutex); 857 858 lzma_free(coder, allocator); 859 return; 860 } 861 862 863 /// Options handling for lzma_stream_encoder_mt_init() and 864 /// lzma_stream_encoder_mt_memusage() 865 static lzma_ret 866 get_options(const lzma_mt *options, lzma_options_easy *opt_easy, 867 const lzma_filter **filters, uint64_t *block_size, 868 uint64_t *outbuf_size_max) 869 { 870 // Validate some of the options. 871 if (options == NULL) 872 return LZMA_PROG_ERROR; 873 874 if (options->flags != 0 || options->threads == 0 875 || options->threads > LZMA_THREADS_MAX) 876 return LZMA_OPTIONS_ERROR; 877 878 if (options->filters != NULL) { 879 // Filter chain was given, use it as is. 880 *filters = options->filters; 881 } else { 882 // Use a preset. 883 if (lzma_easy_preset(opt_easy, options->preset)) 884 return LZMA_OPTIONS_ERROR; 885 886 *filters = opt_easy->filters; 887 } 888 889 // Block size 890 if (options->block_size > 0) { 891 if (options->block_size > BLOCK_SIZE_MAX) 892 return LZMA_OPTIONS_ERROR; 893 894 *block_size = options->block_size; 895 } else { 896 // Determine the Block size from the filter chain. 897 *block_size = lzma_mt_block_size(*filters); 898 if (*block_size == 0) 899 return LZMA_OPTIONS_ERROR; 900 901 assert(*block_size <= BLOCK_SIZE_MAX); 902 } 903 904 // Calculate the maximum amount output that a single output buffer 905 // may need to hold. This is the same as the maximum total size of 906 // a Block. 907 *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 908 if (*outbuf_size_max == 0) 909 return LZMA_MEM_ERROR; 910 911 return LZMA_OK; 912 } 913 914 915 static void 916 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out) 917 { 918 lzma_stream_coder *coder = coder_ptr; 919 920 // Lock coder->mutex to prevent finishing threads from moving their 921 // progress info from the worker_thread structure to lzma_stream_coder. 922 mythread_sync(coder->mutex) { 923 *progress_in = coder->progress_in; 924 *progress_out = coder->progress_out; 925 926 for (size_t i = 0; i < coder->threads_initialized; ++i) { 927 mythread_sync(coder->threads[i].mutex) { 928 *progress_in += coder->threads[i].progress_in; 929 *progress_out += coder->threads[i] 930 .progress_out; 931 } 932 } 933 } 934 935 return; 936 } 937 938 939 static lzma_ret 940 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 941 const lzma_mt *options) 942 { 943 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 944 945 // Get the filter chain. 946 lzma_options_easy easy; 947 const lzma_filter *filters; 948 uint64_t block_size; 949 uint64_t outbuf_size_max; 950 return_if_error(get_options(options, &easy, &filters, 951 &block_size, &outbuf_size_max)); 952 953 #if SIZE_MAX < UINT64_MAX 954 if (block_size > SIZE_MAX) 955 return LZMA_MEM_ERROR; 956 #endif 957 958 // Validate the filter chain so that we can give an error in this 959 // function instead of delaying it to the first call to lzma_code(). 960 // The memory usage calculation verifies the filter chain as 961 // a side effect so we take advantage of that. 962 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 963 return LZMA_OPTIONS_ERROR; 964 965 // Validate the Check ID. 966 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 967 return LZMA_PROG_ERROR; 968 969 if (!lzma_check_is_supported(options->check)) 970 return LZMA_UNSUPPORTED_CHECK; 971 972 // Allocate and initialize the base structure if needed. 973 lzma_stream_coder *coder = next->coder; 974 if (coder == NULL) { 975 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator); 976 if (coder == NULL) 977 return LZMA_MEM_ERROR; 978 979 next->coder = coder; 980 981 // For the mutex and condition variable initializations 982 // the error handling has to be done here because 983 // stream_encoder_mt_end() doesn't know if they have 984 // already been initialized or not. 985 if (mythread_mutex_init(&coder->mutex)) { 986 lzma_free(coder, allocator); 987 next->coder = NULL; 988 return LZMA_MEM_ERROR; 989 } 990 991 if (mythread_cond_init(&coder->cond)) { 992 mythread_mutex_destroy(&coder->mutex); 993 lzma_free(coder, allocator); 994 next->coder = NULL; 995 return LZMA_MEM_ERROR; 996 } 997 998 next->code = &stream_encode_mt; 999 next->end = &stream_encoder_mt_end; 1000 next->get_progress = &get_progress; 1001 // next->update = &stream_encoder_mt_update; 1002 1003 coder->filters[0].id = LZMA_VLI_UNKNOWN; 1004 coder->index_encoder = LZMA_NEXT_CODER_INIT; 1005 coder->index = NULL; 1006 memzero(&coder->outq, sizeof(coder->outq)); 1007 coder->threads = NULL; 1008 coder->threads_max = 0; 1009 coder->threads_initialized = 0; 1010 } 1011 1012 // Basic initializations 1013 coder->sequence = SEQ_STREAM_HEADER; 1014 coder->block_size = (size_t)(block_size); 1015 coder->thread_error = LZMA_OK; 1016 coder->thr = NULL; 1017 1018 // Allocate the thread-specific base structures. 1019 assert(options->threads > 0); 1020 if (coder->threads_max != options->threads) { 1021 threads_end(coder, allocator); 1022 1023 coder->threads = NULL; 1024 coder->threads_max = 0; 1025 1026 coder->threads_initialized = 0; 1027 coder->threads_free = NULL; 1028 1029 coder->threads = lzma_alloc( 1030 options->threads * sizeof(worker_thread), 1031 allocator); 1032 if (coder->threads == NULL) 1033 return LZMA_MEM_ERROR; 1034 1035 coder->threads_max = options->threads; 1036 } else { 1037 // Reuse the old structures and threads. Tell the running 1038 // threads to stop and wait until they have stopped. 1039 threads_stop(coder, true); 1040 } 1041 1042 // Output queue 1043 return_if_error(lzma_outq_init(&coder->outq, allocator, 1044 outbuf_size_max, options->threads)); 1045 1046 // Timeout 1047 coder->timeout = options->timeout; 1048 1049 // Free the old filter chain and copy the new one. 1050 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 1051 lzma_free(coder->filters[i].options, allocator); 1052 1053 return_if_error(lzma_filters_copy( 1054 filters, coder->filters, allocator)); 1055 1056 // Index 1057 lzma_index_end(coder->index, allocator); 1058 coder->index = lzma_index_init(allocator); 1059 if (coder->index == NULL) 1060 return LZMA_MEM_ERROR; 1061 1062 // Stream Header 1063 coder->stream_flags.version = 0; 1064 coder->stream_flags.check = options->check; 1065 return_if_error(lzma_stream_header_encode( 1066 &coder->stream_flags, coder->header)); 1067 1068 coder->header_pos = 0; 1069 1070 // Progress info 1071 coder->progress_in = 0; 1072 coder->progress_out = LZMA_STREAM_HEADER_SIZE; 1073 1074 return LZMA_OK; 1075 } 1076 1077 1078 extern LZMA_API(lzma_ret) 1079 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 1080 { 1081 lzma_next_strm_init(stream_encoder_mt_init, strm, options); 1082 1083 strm->internal->supported_actions[LZMA_RUN] = true; 1084 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 1085 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 1086 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 1087 strm->internal->supported_actions[LZMA_FINISH] = true; 1088 1089 return LZMA_OK; 1090 } 1091 1092 1093 // This function name is a monster but it's consistent with the older 1094 // monster names. :-( 31 chars is the max that C99 requires so in that 1095 // sense it's not too long. ;-) 1096 extern LZMA_API(uint64_t) 1097 lzma_stream_encoder_mt_memusage(const lzma_mt *options) 1098 { 1099 lzma_options_easy easy; 1100 const lzma_filter *filters; 1101 uint64_t block_size; 1102 uint64_t outbuf_size_max; 1103 1104 if (get_options(options, &easy, &filters, &block_size, 1105 &outbuf_size_max) != LZMA_OK) 1106 return UINT64_MAX; 1107 1108 // Memory usage of the input buffers 1109 const uint64_t inbuf_memusage = options->threads * block_size; 1110 1111 // Memory usage of the filter encoders 1112 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 1113 if (filters_memusage == UINT64_MAX) 1114 return UINT64_MAX; 1115 1116 filters_memusage *= options->threads; 1117 1118 // Memory usage of the output queue 1119 const uint64_t outq_memusage = lzma_outq_memusage( 1120 outbuf_size_max, options->threads); 1121 if (outq_memusage == UINT64_MAX) 1122 return UINT64_MAX; 1123 1124 // Sum them with overflow checking. 1125 uint64_t total_memusage = LZMA_MEMUSAGE_BASE 1126 + sizeof(lzma_stream_coder) 1127 + options->threads * sizeof(worker_thread); 1128 1129 if (UINT64_MAX - total_memusage < inbuf_memusage) 1130 return UINT64_MAX; 1131 1132 total_memusage += inbuf_memusage; 1133 1134 if (UINT64_MAX - total_memusage < filters_memusage) 1135 return UINT64_MAX; 1136 1137 total_memusage += filters_memusage; 1138 1139 if (UINT64_MAX - total_memusage < outq_memusage) 1140 return UINT64_MAX; 1141 1142 return total_memusage + outq_memusage; 1143 } 1144