1*3b35e7eeSXin LI // SPDX-License-Identifier: 0BSD 2*3b35e7eeSXin LI 353200025SRui Paulo /////////////////////////////////////////////////////////////////////////////// 453200025SRui Paulo // 553200025SRui Paulo /// \file stream_encoder_mt.c 653200025SRui Paulo /// \brief Multithreaded .xz Stream encoder 753200025SRui Paulo // 853200025SRui Paulo // Author: Lasse Collin 953200025SRui Paulo // 1053200025SRui Paulo /////////////////////////////////////////////////////////////////////////////// 1153200025SRui Paulo 1253200025SRui Paulo #include "filter_encoder.h" 1353200025SRui Paulo #include "easy_preset.h" 1453200025SRui Paulo #include "block_encoder.h" 1553200025SRui Paulo #include "block_buffer_encoder.h" 1653200025SRui Paulo #include "index_encoder.h" 1753200025SRui Paulo #include "outqueue.h" 1853200025SRui Paulo 1953200025SRui Paulo 2053200025SRui Paulo /// Maximum supported block size. This makes it simpler to prevent integer 2153200025SRui Paulo /// overflows if we are given unusually large block size. 2253200025SRui Paulo #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 2353200025SRui Paulo 2453200025SRui Paulo 2553200025SRui Paulo typedef enum { 2653200025SRui Paulo /// Waiting for work. 2753200025SRui Paulo THR_IDLE, 2853200025SRui Paulo 2953200025SRui Paulo /// Encoding is in progress. 3053200025SRui Paulo THR_RUN, 3153200025SRui Paulo 3253200025SRui Paulo /// Encoding is in progress but no more input data will 3353200025SRui Paulo /// be read. 3453200025SRui Paulo THR_FINISH, 3553200025SRui Paulo 3653200025SRui Paulo /// The main thread wants the thread to stop whatever it was doing 3753200025SRui Paulo /// but not exit. 3853200025SRui Paulo THR_STOP, 3953200025SRui Paulo 4053200025SRui Paulo /// The main thread wants the thread to exit. We could use 4153200025SRui Paulo /// cancellation but since there's stopped anyway, this is lazier. 4253200025SRui Paulo THR_EXIT, 4353200025SRui Paulo 4453200025SRui Paulo } worker_state; 4553200025SRui Paulo 461456f0f9SXin LI typedef struct lzma_stream_coder_s lzma_stream_coder; 4753200025SRui Paulo 4853200025SRui Paulo typedef struct worker_thread_s worker_thread; 4953200025SRui Paulo struct worker_thread_s { 5053200025SRui Paulo worker_state state; 5153200025SRui Paulo 5253200025SRui Paulo /// Input buffer of coder->block_size bytes. The main thread will 5353200025SRui Paulo /// put new input into this and update in_size accordingly. Once 5453200025SRui Paulo /// no more input is coming, state will be set to THR_FINISH. 5553200025SRui Paulo uint8_t *in; 5653200025SRui Paulo 5753200025SRui Paulo /// Amount of data available in the input buffer. This is modified 5853200025SRui Paulo /// only by the main thread. 5953200025SRui Paulo size_t in_size; 6053200025SRui Paulo 6153200025SRui Paulo /// Output buffer for this thread. This is set by the main 6253200025SRui Paulo /// thread every time a new Block is started with this thread 6353200025SRui Paulo /// structure. 6453200025SRui Paulo lzma_outbuf *outbuf; 6553200025SRui Paulo 6653200025SRui Paulo /// Pointer to the main structure is needed when putting this 6753200025SRui Paulo /// thread back to the stack of free threads. 681456f0f9SXin LI lzma_stream_coder *coder; 6953200025SRui Paulo 7053200025SRui Paulo /// The allocator is set by the main thread. Since a copy of the 7153200025SRui Paulo /// pointer is kept here, the application must not change the 7253200025SRui Paulo /// allocator before calling lzma_end(). 7353200025SRui Paulo const lzma_allocator *allocator; 7453200025SRui Paulo 7553200025SRui Paulo /// Amount of uncompressed data that has already been compressed. 7653200025SRui Paulo uint64_t progress_in; 7753200025SRui Paulo 7853200025SRui Paulo /// Amount of compressed data that is ready. 7953200025SRui Paulo uint64_t progress_out; 8053200025SRui Paulo 8153200025SRui Paulo /// Block encoder 8253200025SRui Paulo lzma_next_coder block_encoder; 8353200025SRui Paulo 8453200025SRui Paulo /// Compression options for this Block 8553200025SRui Paulo lzma_block block_options; 8653200025SRui Paulo 8773ed8e77SXin LI /// Filter chain for this thread. By copying the filters array 8873ed8e77SXin LI /// to each thread it is possible to change the filter chain 8973ed8e77SXin LI /// between Blocks using lzma_filters_update(). 9073ed8e77SXin LI lzma_filter filters[LZMA_FILTERS_MAX + 1]; 9173ed8e77SXin LI 9253200025SRui Paulo /// Next structure in the stack of free worker threads. 9353200025SRui Paulo worker_thread *next; 9453200025SRui Paulo 9553200025SRui Paulo mythread_mutex mutex; 9653200025SRui Paulo mythread_cond cond; 9753200025SRui Paulo 9853200025SRui Paulo /// The ID of this thread is used to join the thread 9953200025SRui Paulo /// when it's not needed anymore. 10053200025SRui Paulo mythread thread_id; 10153200025SRui Paulo }; 10253200025SRui Paulo 10353200025SRui Paulo 1041456f0f9SXin LI struct lzma_stream_coder_s { 10553200025SRui Paulo enum { 10653200025SRui Paulo SEQ_STREAM_HEADER, 10753200025SRui Paulo SEQ_BLOCK, 10853200025SRui Paulo SEQ_INDEX, 10953200025SRui Paulo SEQ_STREAM_FOOTER, 11053200025SRui Paulo } sequence; 11153200025SRui Paulo 11253200025SRui Paulo /// Start a new Block every block_size bytes of input unless 11353200025SRui Paulo /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 11453200025SRui Paulo size_t block_size; 11553200025SRui Paulo 11673ed8e77SXin LI /// The filter chain to use for the next Block. 11773ed8e77SXin LI /// This can be updated using lzma_filters_update() 11873ed8e77SXin LI /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH. 11953200025SRui Paulo lzma_filter filters[LZMA_FILTERS_MAX + 1]; 12053200025SRui Paulo 12173ed8e77SXin LI /// A copy of filters[] will be put here when attempting to get 12273ed8e77SXin LI /// a new worker thread. This will be copied to a worker thread 12373ed8e77SXin LI /// when a thread becomes free and then this cache is marked as 12473ed8e77SXin LI /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache 12573ed8e77SXin LI /// the filter options from filters[] would get uselessly copied 12673ed8e77SXin LI /// multiple times (allocated and freed) when waiting for a new free 12773ed8e77SXin LI /// worker thread. 12873ed8e77SXin LI /// 12973ed8e77SXin LI /// This is freed if filters[] is updated via lzma_filters_update(). 13073ed8e77SXin LI lzma_filter filters_cache[LZMA_FILTERS_MAX + 1]; 13173ed8e77SXin LI 13253200025SRui Paulo 13353200025SRui Paulo /// Index to hold sizes of the Blocks 13453200025SRui Paulo lzma_index *index; 13553200025SRui Paulo 13653200025SRui Paulo /// Index encoder 13753200025SRui Paulo lzma_next_coder index_encoder; 13853200025SRui Paulo 13953200025SRui Paulo 14053200025SRui Paulo /// Stream Flags for encoding the Stream Header and Stream Footer. 14153200025SRui Paulo lzma_stream_flags stream_flags; 14253200025SRui Paulo 14353200025SRui Paulo /// Buffer to hold Stream Header and Stream Footer. 14453200025SRui Paulo uint8_t header[LZMA_STREAM_HEADER_SIZE]; 14553200025SRui Paulo 14653200025SRui Paulo /// Read position in header[] 14753200025SRui Paulo size_t header_pos; 14853200025SRui Paulo 14953200025SRui Paulo 15053200025SRui Paulo /// Output buffer queue for compressed data 15153200025SRui Paulo lzma_outq outq; 15253200025SRui Paulo 15373ed8e77SXin LI /// How much memory to allocate for each lzma_outbuf.buf 15473ed8e77SXin LI size_t outbuf_alloc_size; 15573ed8e77SXin LI 15653200025SRui Paulo 15753200025SRui Paulo /// Maximum wait time if cannot use all the input and cannot 15853200025SRui Paulo /// fill the output buffer. This is in milliseconds. 15953200025SRui Paulo uint32_t timeout; 16053200025SRui Paulo 16153200025SRui Paulo 16253200025SRui Paulo /// Error code from a worker thread 16353200025SRui Paulo lzma_ret thread_error; 16453200025SRui Paulo 16553200025SRui Paulo /// Array of allocated thread-specific structures 16653200025SRui Paulo worker_thread *threads; 16753200025SRui Paulo 16853200025SRui Paulo /// Number of structures in "threads" above. This is also the 16953200025SRui Paulo /// number of threads that will be created at maximum. 17053200025SRui Paulo uint32_t threads_max; 17153200025SRui Paulo 17253200025SRui Paulo /// Number of thread structures that have been initialized, and 17353200025SRui Paulo /// thus the number of worker threads actually created so far. 17453200025SRui Paulo uint32_t threads_initialized; 17553200025SRui Paulo 17653200025SRui Paulo /// Stack of free threads. When a thread finishes, it puts itself 17753200025SRui Paulo /// back into this stack. This starts as empty because threads 17853200025SRui Paulo /// are created only when actually needed. 17953200025SRui Paulo worker_thread *threads_free; 18053200025SRui Paulo 18153200025SRui Paulo /// The most recent worker thread to which the main thread writes 18253200025SRui Paulo /// the new input from the application. 18353200025SRui Paulo worker_thread *thr; 18453200025SRui Paulo 18553200025SRui Paulo 18653200025SRui Paulo /// Amount of uncompressed data in Blocks that have already 18753200025SRui Paulo /// been finished. 18853200025SRui Paulo uint64_t progress_in; 18953200025SRui Paulo 19053200025SRui Paulo /// Amount of compressed data in Stream Header + Blocks that 19153200025SRui Paulo /// have already been finished. 19253200025SRui Paulo uint64_t progress_out; 19353200025SRui Paulo 19453200025SRui Paulo 19553200025SRui Paulo mythread_mutex mutex; 19653200025SRui Paulo mythread_cond cond; 19753200025SRui Paulo }; 19853200025SRui Paulo 19953200025SRui Paulo 20053200025SRui Paulo /// Tell the main thread that something has gone wrong. 20153200025SRui Paulo static void 20253200025SRui Paulo worker_error(worker_thread *thr, lzma_ret ret) 20353200025SRui Paulo { 20453200025SRui Paulo assert(ret != LZMA_OK); 20553200025SRui Paulo assert(ret != LZMA_STREAM_END); 20653200025SRui Paulo 20753200025SRui Paulo mythread_sync(thr->coder->mutex) { 20853200025SRui Paulo if (thr->coder->thread_error == LZMA_OK) 20953200025SRui Paulo thr->coder->thread_error = ret; 21053200025SRui Paulo 21153200025SRui Paulo mythread_cond_signal(&thr->coder->cond); 21253200025SRui Paulo } 21353200025SRui Paulo 21453200025SRui Paulo return; 21553200025SRui Paulo } 21653200025SRui Paulo 21753200025SRui Paulo 21853200025SRui Paulo static worker_state 21973ed8e77SXin LI worker_encode(worker_thread *thr, size_t *out_pos, worker_state state) 22053200025SRui Paulo { 22153200025SRui Paulo assert(thr->progress_in == 0); 22253200025SRui Paulo assert(thr->progress_out == 0); 22353200025SRui Paulo 22453200025SRui Paulo // Set the Block options. 22553200025SRui Paulo thr->block_options = (lzma_block){ 22653200025SRui Paulo .version = 0, 22753200025SRui Paulo .check = thr->coder->stream_flags.check, 22873ed8e77SXin LI .compressed_size = thr->outbuf->allocated, 22953200025SRui Paulo .uncompressed_size = thr->coder->block_size, 23073ed8e77SXin LI .filters = thr->filters, 23153200025SRui Paulo }; 23253200025SRui Paulo 23353200025SRui Paulo // Calculate maximum size of the Block Header. This amount is 23453200025SRui Paulo // reserved in the beginning of the buffer so that Block Header 23553200025SRui Paulo // along with Compressed Size and Uncompressed Size can be 23653200025SRui Paulo // written there. 23753200025SRui Paulo lzma_ret ret = lzma_block_header_size(&thr->block_options); 23853200025SRui Paulo if (ret != LZMA_OK) { 23953200025SRui Paulo worker_error(thr, ret); 24053200025SRui Paulo return THR_STOP; 24153200025SRui Paulo } 24253200025SRui Paulo 24353200025SRui Paulo // Initialize the Block encoder. 24453200025SRui Paulo ret = lzma_block_encoder_init(&thr->block_encoder, 24553200025SRui Paulo thr->allocator, &thr->block_options); 24653200025SRui Paulo if (ret != LZMA_OK) { 24753200025SRui Paulo worker_error(thr, ret); 24853200025SRui Paulo return THR_STOP; 24953200025SRui Paulo } 25053200025SRui Paulo 25153200025SRui Paulo size_t in_pos = 0; 25253200025SRui Paulo size_t in_size = 0; 25353200025SRui Paulo 25473ed8e77SXin LI *out_pos = thr->block_options.header_size; 25573ed8e77SXin LI const size_t out_size = thr->outbuf->allocated; 25653200025SRui Paulo 25753200025SRui Paulo do { 25853200025SRui Paulo mythread_sync(thr->mutex) { 25973ed8e77SXin LI // Store in_pos and *out_pos into *thr so that 26053200025SRui Paulo // an application may read them via 26153200025SRui Paulo // lzma_get_progress() to get progress information. 26253200025SRui Paulo // 26353200025SRui Paulo // NOTE: These aren't updated when the encoding 26453200025SRui Paulo // finishes. Instead, the final values are taken 26553200025SRui Paulo // later from thr->outbuf. 26653200025SRui Paulo thr->progress_in = in_pos; 26773ed8e77SXin LI thr->progress_out = *out_pos; 26853200025SRui Paulo 26953200025SRui Paulo while (in_size == thr->in_size 27053200025SRui Paulo && thr->state == THR_RUN) 27153200025SRui Paulo mythread_cond_wait(&thr->cond, &thr->mutex); 27253200025SRui Paulo 27353200025SRui Paulo state = thr->state; 27453200025SRui Paulo in_size = thr->in_size; 27553200025SRui Paulo } 27653200025SRui Paulo 27753200025SRui Paulo // Return if we were asked to stop or exit. 27853200025SRui Paulo if (state >= THR_STOP) 27953200025SRui Paulo return state; 28053200025SRui Paulo 28153200025SRui Paulo lzma_action action = state == THR_FINISH 28253200025SRui Paulo ? LZMA_FINISH : LZMA_RUN; 28353200025SRui Paulo 28453200025SRui Paulo // Limit the amount of input given to the Block encoder 28553200025SRui Paulo // at once. This way this thread can react fairly quickly 28653200025SRui Paulo // if the main thread wants us to stop or exit. 28753200025SRui Paulo static const size_t in_chunk_max = 16384; 28853200025SRui Paulo size_t in_limit = in_size; 28953200025SRui Paulo if (in_size - in_pos > in_chunk_max) { 29053200025SRui Paulo in_limit = in_pos + in_chunk_max; 29153200025SRui Paulo action = LZMA_RUN; 29253200025SRui Paulo } 29353200025SRui Paulo 29453200025SRui Paulo ret = thr->block_encoder.code( 29553200025SRui Paulo thr->block_encoder.coder, thr->allocator, 29653200025SRui Paulo thr->in, &in_pos, in_limit, thr->outbuf->buf, 29773ed8e77SXin LI out_pos, out_size, action); 29873ed8e77SXin LI } while (ret == LZMA_OK && *out_pos < out_size); 29953200025SRui Paulo 30053200025SRui Paulo switch (ret) { 30153200025SRui Paulo case LZMA_STREAM_END: 30253200025SRui Paulo assert(state == THR_FINISH); 30353200025SRui Paulo 30453200025SRui Paulo // Encode the Block Header. By doing it after 30553200025SRui Paulo // the compression, we can store the Compressed Size 30653200025SRui Paulo // and Uncompressed Size fields. 30753200025SRui Paulo ret = lzma_block_header_encode(&thr->block_options, 30853200025SRui Paulo thr->outbuf->buf); 30953200025SRui Paulo if (ret != LZMA_OK) { 31053200025SRui Paulo worker_error(thr, ret); 31153200025SRui Paulo return THR_STOP; 31253200025SRui Paulo } 31353200025SRui Paulo 31453200025SRui Paulo break; 31553200025SRui Paulo 31653200025SRui Paulo case LZMA_OK: 31753200025SRui Paulo // The data was incompressible. Encode it using uncompressed 31853200025SRui Paulo // LZMA2 chunks. 31953200025SRui Paulo // 32053200025SRui Paulo // First wait that we have gotten all the input. 32153200025SRui Paulo mythread_sync(thr->mutex) { 32253200025SRui Paulo while (thr->state == THR_RUN) 32353200025SRui Paulo mythread_cond_wait(&thr->cond, &thr->mutex); 32453200025SRui Paulo 32553200025SRui Paulo state = thr->state; 32653200025SRui Paulo in_size = thr->in_size; 32753200025SRui Paulo } 32853200025SRui Paulo 32953200025SRui Paulo if (state >= THR_STOP) 33053200025SRui Paulo return state; 33153200025SRui Paulo 33253200025SRui Paulo // Do the encoding. This takes care of the Block Header too. 33373ed8e77SXin LI *out_pos = 0; 33453200025SRui Paulo ret = lzma_block_uncomp_encode(&thr->block_options, 33553200025SRui Paulo thr->in, in_size, thr->outbuf->buf, 33673ed8e77SXin LI out_pos, out_size); 33753200025SRui Paulo 33853200025SRui Paulo // It shouldn't fail. 33953200025SRui Paulo if (ret != LZMA_OK) { 34053200025SRui Paulo worker_error(thr, LZMA_PROG_ERROR); 34153200025SRui Paulo return THR_STOP; 34253200025SRui Paulo } 34353200025SRui Paulo 34453200025SRui Paulo break; 34553200025SRui Paulo 34653200025SRui Paulo default: 34753200025SRui Paulo worker_error(thr, ret); 34853200025SRui Paulo return THR_STOP; 34953200025SRui Paulo } 35053200025SRui Paulo 35153200025SRui Paulo // Set the size information that will be read by the main thread 35253200025SRui Paulo // to write the Index field. 35353200025SRui Paulo thr->outbuf->unpadded_size 35453200025SRui Paulo = lzma_block_unpadded_size(&thr->block_options); 35553200025SRui Paulo assert(thr->outbuf->unpadded_size != 0); 35653200025SRui Paulo thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 35753200025SRui Paulo 35853200025SRui Paulo return THR_FINISH; 35953200025SRui Paulo } 36053200025SRui Paulo 36153200025SRui Paulo 36253200025SRui Paulo static MYTHREAD_RET_TYPE 36353200025SRui Paulo worker_start(void *thr_ptr) 36453200025SRui Paulo { 36553200025SRui Paulo worker_thread *thr = thr_ptr; 36653200025SRui Paulo worker_state state = THR_IDLE; // Init to silence a warning 36753200025SRui Paulo 36853200025SRui Paulo while (true) { 36953200025SRui Paulo // Wait for work. 37053200025SRui Paulo mythread_sync(thr->mutex) { 37153200025SRui Paulo while (true) { 37253200025SRui Paulo // The thread is already idle so if we are 37353200025SRui Paulo // requested to stop, just set the state. 37453200025SRui Paulo if (thr->state == THR_STOP) { 37553200025SRui Paulo thr->state = THR_IDLE; 37653200025SRui Paulo mythread_cond_signal(&thr->cond); 37753200025SRui Paulo } 37853200025SRui Paulo 37953200025SRui Paulo state = thr->state; 38053200025SRui Paulo if (state != THR_IDLE) 38153200025SRui Paulo break; 38253200025SRui Paulo 38353200025SRui Paulo mythread_cond_wait(&thr->cond, &thr->mutex); 38453200025SRui Paulo } 38553200025SRui Paulo } 38653200025SRui Paulo 38773ed8e77SXin LI size_t out_pos = 0; 38873ed8e77SXin LI 38953200025SRui Paulo assert(state != THR_IDLE); 39053200025SRui Paulo assert(state != THR_STOP); 39153200025SRui Paulo 39253200025SRui Paulo if (state <= THR_FINISH) 39373ed8e77SXin LI state = worker_encode(thr, &out_pos, state); 39453200025SRui Paulo 39553200025SRui Paulo if (state == THR_EXIT) 39653200025SRui Paulo break; 39753200025SRui Paulo 39853200025SRui Paulo // Mark the thread as idle unless the main thread has 39953200025SRui Paulo // told us to exit. Signal is needed for the case 40053200025SRui Paulo // where the main thread is waiting for the threads to stop. 40153200025SRui Paulo mythread_sync(thr->mutex) { 40253200025SRui Paulo if (thr->state != THR_EXIT) { 40353200025SRui Paulo thr->state = THR_IDLE; 40453200025SRui Paulo mythread_cond_signal(&thr->cond); 40553200025SRui Paulo } 40653200025SRui Paulo } 40753200025SRui Paulo 40853200025SRui Paulo mythread_sync(thr->coder->mutex) { 40973ed8e77SXin LI // If no errors occurred, make the encoded data 41073ed8e77SXin LI // available to be copied out. 41173ed8e77SXin LI if (state == THR_FINISH) { 41273ed8e77SXin LI thr->outbuf->pos = out_pos; 41373ed8e77SXin LI thr->outbuf->finished = true; 41473ed8e77SXin LI } 41553200025SRui Paulo 41653200025SRui Paulo // Update the main progress info. 41753200025SRui Paulo thr->coder->progress_in 41853200025SRui Paulo += thr->outbuf->uncompressed_size; 41973ed8e77SXin LI thr->coder->progress_out += out_pos; 42053200025SRui Paulo thr->progress_in = 0; 42153200025SRui Paulo thr->progress_out = 0; 42253200025SRui Paulo 42353200025SRui Paulo // Return this thread to the stack of free threads. 42453200025SRui Paulo thr->next = thr->coder->threads_free; 42553200025SRui Paulo thr->coder->threads_free = thr; 42653200025SRui Paulo 42753200025SRui Paulo mythread_cond_signal(&thr->coder->cond); 42853200025SRui Paulo } 42953200025SRui Paulo } 43053200025SRui Paulo 43153200025SRui Paulo // Exiting, free the resources. 43273ed8e77SXin LI lzma_filters_free(thr->filters, thr->allocator); 43373ed8e77SXin LI 43453200025SRui Paulo mythread_mutex_destroy(&thr->mutex); 43553200025SRui Paulo mythread_cond_destroy(&thr->cond); 43653200025SRui Paulo 43753200025SRui Paulo lzma_next_end(&thr->block_encoder, thr->allocator); 43853200025SRui Paulo lzma_free(thr->in, thr->allocator); 43953200025SRui Paulo return MYTHREAD_RET_VALUE; 44053200025SRui Paulo } 44153200025SRui Paulo 44253200025SRui Paulo 44353200025SRui Paulo /// Make the threads stop but not exit. Optionally wait for them to stop. 44453200025SRui Paulo static void 4451456f0f9SXin LI threads_stop(lzma_stream_coder *coder, bool wait_for_threads) 44653200025SRui Paulo { 44753200025SRui Paulo // Tell the threads to stop. 44853200025SRui Paulo for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 44953200025SRui Paulo mythread_sync(coder->threads[i].mutex) { 45053200025SRui Paulo coder->threads[i].state = THR_STOP; 45153200025SRui Paulo mythread_cond_signal(&coder->threads[i].cond); 45253200025SRui Paulo } 45353200025SRui Paulo } 45453200025SRui Paulo 45553200025SRui Paulo if (!wait_for_threads) 45653200025SRui Paulo return; 45753200025SRui Paulo 45853200025SRui Paulo // Wait for the threads to settle in the idle state. 45953200025SRui Paulo for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 46053200025SRui Paulo mythread_sync(coder->threads[i].mutex) { 46153200025SRui Paulo while (coder->threads[i].state != THR_IDLE) 46253200025SRui Paulo mythread_cond_wait(&coder->threads[i].cond, 46353200025SRui Paulo &coder->threads[i].mutex); 46453200025SRui Paulo } 46553200025SRui Paulo } 46653200025SRui Paulo 46753200025SRui Paulo return; 46853200025SRui Paulo } 46953200025SRui Paulo 47053200025SRui Paulo 47153200025SRui Paulo /// Stop the threads and free the resources associated with them. 47253200025SRui Paulo /// Wait until the threads have exited. 47353200025SRui Paulo static void 4741456f0f9SXin LI threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator) 47553200025SRui Paulo { 47653200025SRui Paulo for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 47753200025SRui Paulo mythread_sync(coder->threads[i].mutex) { 47853200025SRui Paulo coder->threads[i].state = THR_EXIT; 47953200025SRui Paulo mythread_cond_signal(&coder->threads[i].cond); 48053200025SRui Paulo } 48153200025SRui Paulo } 48253200025SRui Paulo 48353200025SRui Paulo for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 48453200025SRui Paulo int ret = mythread_join(coder->threads[i].thread_id); 48553200025SRui Paulo assert(ret == 0); 48653200025SRui Paulo (void)ret; 48753200025SRui Paulo } 48853200025SRui Paulo 48953200025SRui Paulo lzma_free(coder->threads, allocator); 49053200025SRui Paulo return; 49153200025SRui Paulo } 49253200025SRui Paulo 49353200025SRui Paulo 49453200025SRui Paulo /// Initialize a new worker_thread structure and create a new thread. 49553200025SRui Paulo static lzma_ret 4961456f0f9SXin LI initialize_new_thread(lzma_stream_coder *coder, 4971456f0f9SXin LI const lzma_allocator *allocator) 49853200025SRui Paulo { 49953200025SRui Paulo worker_thread *thr = &coder->threads[coder->threads_initialized]; 50053200025SRui Paulo 50153200025SRui Paulo thr->in = lzma_alloc(coder->block_size, allocator); 50253200025SRui Paulo if (thr->in == NULL) 50353200025SRui Paulo return LZMA_MEM_ERROR; 50453200025SRui Paulo 50553200025SRui Paulo if (mythread_mutex_init(&thr->mutex)) 50653200025SRui Paulo goto error_mutex; 50753200025SRui Paulo 50853200025SRui Paulo if (mythread_cond_init(&thr->cond)) 50953200025SRui Paulo goto error_cond; 51053200025SRui Paulo 51153200025SRui Paulo thr->state = THR_IDLE; 51253200025SRui Paulo thr->allocator = allocator; 51353200025SRui Paulo thr->coder = coder; 51453200025SRui Paulo thr->progress_in = 0; 51553200025SRui Paulo thr->progress_out = 0; 51653200025SRui Paulo thr->block_encoder = LZMA_NEXT_CODER_INIT; 51773ed8e77SXin LI thr->filters[0].id = LZMA_VLI_UNKNOWN; 51853200025SRui Paulo 51953200025SRui Paulo if (mythread_create(&thr->thread_id, &worker_start, thr)) 52053200025SRui Paulo goto error_thread; 52153200025SRui Paulo 52253200025SRui Paulo ++coder->threads_initialized; 52353200025SRui Paulo coder->thr = thr; 52453200025SRui Paulo 52553200025SRui Paulo return LZMA_OK; 52653200025SRui Paulo 52753200025SRui Paulo error_thread: 52853200025SRui Paulo mythread_cond_destroy(&thr->cond); 52953200025SRui Paulo 53053200025SRui Paulo error_cond: 53153200025SRui Paulo mythread_mutex_destroy(&thr->mutex); 53253200025SRui Paulo 53353200025SRui Paulo error_mutex: 53453200025SRui Paulo lzma_free(thr->in, allocator); 53553200025SRui Paulo return LZMA_MEM_ERROR; 53653200025SRui Paulo } 53753200025SRui Paulo 53853200025SRui Paulo 53953200025SRui Paulo static lzma_ret 5401456f0f9SXin LI get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) 54153200025SRui Paulo { 54253200025SRui Paulo // If there are no free output subqueues, there is no 54353200025SRui Paulo // point to try getting a thread. 54453200025SRui Paulo if (!lzma_outq_has_buf(&coder->outq)) 54553200025SRui Paulo return LZMA_OK; 54653200025SRui Paulo 54773ed8e77SXin LI // That's also true if we cannot allocate memory for the output 54873ed8e77SXin LI // buffer in the output queue. 54973ed8e77SXin LI return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, 55073ed8e77SXin LI coder->outbuf_alloc_size)); 55173ed8e77SXin LI 55273ed8e77SXin LI // Make a thread-specific copy of the filter chain. Put it in 55373ed8e77SXin LI // the cache array first so that if we cannot get a new thread yet, 55473ed8e77SXin LI // the allocation is ready when we try again. 55573ed8e77SXin LI if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN) 55673ed8e77SXin LI return_if_error(lzma_filters_copy( 55773ed8e77SXin LI coder->filters, coder->filters_cache, allocator)); 55873ed8e77SXin LI 55953200025SRui Paulo // If there is a free structure on the stack, use it. 56053200025SRui Paulo mythread_sync(coder->mutex) { 56153200025SRui Paulo if (coder->threads_free != NULL) { 56253200025SRui Paulo coder->thr = coder->threads_free; 56353200025SRui Paulo coder->threads_free = coder->threads_free->next; 56453200025SRui Paulo } 56553200025SRui Paulo } 56653200025SRui Paulo 56753200025SRui Paulo if (coder->thr == NULL) { 56853200025SRui Paulo // If there are no uninitialized structures left, return. 56953200025SRui Paulo if (coder->threads_initialized == coder->threads_max) 57053200025SRui Paulo return LZMA_OK; 57153200025SRui Paulo 57253200025SRui Paulo // Initialize a new thread. 57353200025SRui Paulo return_if_error(initialize_new_thread(coder, allocator)); 57453200025SRui Paulo } 57553200025SRui Paulo 57653200025SRui Paulo // Reset the parts of the thread state that have to be done 57753200025SRui Paulo // in the main thread. 57853200025SRui Paulo mythread_sync(coder->thr->mutex) { 57953200025SRui Paulo coder->thr->state = THR_RUN; 58053200025SRui Paulo coder->thr->in_size = 0; 58173ed8e77SXin LI coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); 58273ed8e77SXin LI 58373ed8e77SXin LI // Free the old thread-specific filter options and replace 58473ed8e77SXin LI // them with the already-allocated new options from 58573ed8e77SXin LI // coder->filters_cache[]. Then mark the cache as empty. 58673ed8e77SXin LI lzma_filters_free(coder->thr->filters, allocator); 58773ed8e77SXin LI memcpy(coder->thr->filters, coder->filters_cache, 58873ed8e77SXin LI sizeof(coder->filters_cache)); 58973ed8e77SXin LI coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; 59073ed8e77SXin LI 59153200025SRui Paulo mythread_cond_signal(&coder->thr->cond); 59253200025SRui Paulo } 59353200025SRui Paulo 59453200025SRui Paulo return LZMA_OK; 59553200025SRui Paulo } 59653200025SRui Paulo 59753200025SRui Paulo 59853200025SRui Paulo static lzma_ret 5991456f0f9SXin LI stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, 60053200025SRui Paulo const uint8_t *restrict in, size_t *restrict in_pos, 60153200025SRui Paulo size_t in_size, lzma_action action) 60253200025SRui Paulo { 60353200025SRui Paulo while (*in_pos < in_size 60453200025SRui Paulo || (coder->thr != NULL && action != LZMA_RUN)) { 60553200025SRui Paulo if (coder->thr == NULL) { 60653200025SRui Paulo // Get a new thread. 60753200025SRui Paulo const lzma_ret ret = get_thread(coder, allocator); 60853200025SRui Paulo if (coder->thr == NULL) 60953200025SRui Paulo return ret; 61053200025SRui Paulo } 61153200025SRui Paulo 61253200025SRui Paulo // Copy the input data to thread's buffer. 61353200025SRui Paulo size_t thr_in_size = coder->thr->in_size; 61453200025SRui Paulo lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 61553200025SRui Paulo &thr_in_size, coder->block_size); 61653200025SRui Paulo 61753200025SRui Paulo // Tell the Block encoder to finish if 61853200025SRui Paulo // - it has got block_size bytes of input; or 61953200025SRui Paulo // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 62053200025SRui Paulo // or LZMA_FULL_BARRIER was used. 62153200025SRui Paulo // 62253200025SRui Paulo // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 62353200025SRui Paulo const bool finish = thr_in_size == coder->block_size 62453200025SRui Paulo || (*in_pos == in_size && action != LZMA_RUN); 62553200025SRui Paulo 62653200025SRui Paulo bool block_error = false; 62753200025SRui Paulo 62853200025SRui Paulo mythread_sync(coder->thr->mutex) { 62953200025SRui Paulo if (coder->thr->state == THR_IDLE) { 63053200025SRui Paulo // Something has gone wrong with the Block 63153200025SRui Paulo // encoder. It has set coder->thread_error 63253200025SRui Paulo // which we will read a few lines later. 63353200025SRui Paulo block_error = true; 63453200025SRui Paulo } else { 63553200025SRui Paulo // Tell the Block encoder its new amount 63653200025SRui Paulo // of input and update the state if needed. 63753200025SRui Paulo coder->thr->in_size = thr_in_size; 63853200025SRui Paulo 63953200025SRui Paulo if (finish) 64053200025SRui Paulo coder->thr->state = THR_FINISH; 64153200025SRui Paulo 64253200025SRui Paulo mythread_cond_signal(&coder->thr->cond); 64353200025SRui Paulo } 64453200025SRui Paulo } 64553200025SRui Paulo 64653200025SRui Paulo if (block_error) { 647c917796cSXin LI lzma_ret ret = LZMA_OK; // Init to silence a warning. 64853200025SRui Paulo 64953200025SRui Paulo mythread_sync(coder->mutex) { 65053200025SRui Paulo ret = coder->thread_error; 65153200025SRui Paulo } 65253200025SRui Paulo 65353200025SRui Paulo return ret; 65453200025SRui Paulo } 65553200025SRui Paulo 65653200025SRui Paulo if (finish) 65753200025SRui Paulo coder->thr = NULL; 65853200025SRui Paulo } 65953200025SRui Paulo 66053200025SRui Paulo return LZMA_OK; 66153200025SRui Paulo } 66253200025SRui Paulo 66353200025SRui Paulo 66453200025SRui Paulo /// Wait until more input can be consumed, more output can be read, or 66553200025SRui Paulo /// an optional timeout is reached. 66653200025SRui Paulo static bool 6671456f0f9SXin LI wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, 66853200025SRui Paulo bool *has_blocked, bool has_input) 66953200025SRui Paulo { 67053200025SRui Paulo if (coder->timeout != 0 && !*has_blocked) { 67153200025SRui Paulo // Every time when stream_encode_mt() is called via 67253200025SRui Paulo // lzma_code(), *has_blocked starts as false. We set it 67353200025SRui Paulo // to true here and calculate the absolute time when 67453200025SRui Paulo // we must return if there's nothing to do. 67553200025SRui Paulo // 67673ed8e77SXin LI // This way if we block multiple times for short moments 67773ed8e77SXin LI // less than "timeout" milliseconds, we will return once 67873ed8e77SXin LI // "timeout" amount of time has passed since the *first* 67973ed8e77SXin LI // blocking occurred. If the absolute time was calculated 68073ed8e77SXin LI // again every time we block, "timeout" would effectively 68173ed8e77SXin LI // be meaningless if we never consecutively block longer 68273ed8e77SXin LI // than "timeout" ms. 68353200025SRui Paulo *has_blocked = true; 68453200025SRui Paulo mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 68553200025SRui Paulo } 68653200025SRui Paulo 68753200025SRui Paulo bool timed_out = false; 68853200025SRui Paulo 68953200025SRui Paulo mythread_sync(coder->mutex) { 69053200025SRui Paulo // There are four things that we wait. If one of them 69153200025SRui Paulo // becomes possible, we return. 69253200025SRui Paulo // - If there is input left, we need to get a free 69353200025SRui Paulo // worker thread and an output buffer for it. 69453200025SRui Paulo // - Data ready to be read from the output queue. 69553200025SRui Paulo // - A worker thread indicates an error. 69653200025SRui Paulo // - Time out occurs. 69753200025SRui Paulo while ((!has_input || coder->threads_free == NULL 69853200025SRui Paulo || !lzma_outq_has_buf(&coder->outq)) 69953200025SRui Paulo && !lzma_outq_is_readable(&coder->outq) 70053200025SRui Paulo && coder->thread_error == LZMA_OK 70153200025SRui Paulo && !timed_out) { 70253200025SRui Paulo if (coder->timeout != 0) 70353200025SRui Paulo timed_out = mythread_cond_timedwait( 70453200025SRui Paulo &coder->cond, &coder->mutex, 70553200025SRui Paulo wait_abs) != 0; 70653200025SRui Paulo else 70753200025SRui Paulo mythread_cond_wait(&coder->cond, 70853200025SRui Paulo &coder->mutex); 70953200025SRui Paulo } 71053200025SRui Paulo } 71153200025SRui Paulo 71253200025SRui Paulo return timed_out; 71353200025SRui Paulo } 71453200025SRui Paulo 71553200025SRui Paulo 71653200025SRui Paulo static lzma_ret 7171456f0f9SXin LI stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, 71853200025SRui Paulo const uint8_t *restrict in, size_t *restrict in_pos, 71953200025SRui Paulo size_t in_size, uint8_t *restrict out, 72053200025SRui Paulo size_t *restrict out_pos, size_t out_size, lzma_action action) 72153200025SRui Paulo { 7221456f0f9SXin LI lzma_stream_coder *coder = coder_ptr; 7231456f0f9SXin LI 72453200025SRui Paulo switch (coder->sequence) { 72553200025SRui Paulo case SEQ_STREAM_HEADER: 72653200025SRui Paulo lzma_bufcpy(coder->header, &coder->header_pos, 72753200025SRui Paulo sizeof(coder->header), 72853200025SRui Paulo out, out_pos, out_size); 72953200025SRui Paulo if (coder->header_pos < sizeof(coder->header)) 73053200025SRui Paulo return LZMA_OK; 73153200025SRui Paulo 73253200025SRui Paulo coder->header_pos = 0; 73353200025SRui Paulo coder->sequence = SEQ_BLOCK; 73453200025SRui Paulo 73553200025SRui Paulo // Fall through 73653200025SRui Paulo 73753200025SRui Paulo case SEQ_BLOCK: { 73853200025SRui Paulo // Initialized to silence warnings. 73953200025SRui Paulo lzma_vli unpadded_size = 0; 74053200025SRui Paulo lzma_vli uncompressed_size = 0; 74153200025SRui Paulo lzma_ret ret = LZMA_OK; 74253200025SRui Paulo 74353200025SRui Paulo // These are for wait_for_work(). 74453200025SRui Paulo bool has_blocked = false; 7451f3ced26SXin LI mythread_condtime wait_abs = { 0 }; 74653200025SRui Paulo 74753200025SRui Paulo while (true) { 74853200025SRui Paulo mythread_sync(coder->mutex) { 74953200025SRui Paulo // Check for Block encoder errors. 75053200025SRui Paulo ret = coder->thread_error; 75153200025SRui Paulo if (ret != LZMA_OK) { 75253200025SRui Paulo assert(ret != LZMA_STREAM_END); 753a8675d92SXin LI break; // Break out of mythread_sync. 75453200025SRui Paulo } 75553200025SRui Paulo 75653200025SRui Paulo // Try to read compressed data to out[]. 75773ed8e77SXin LI ret = lzma_outq_read(&coder->outq, allocator, 75853200025SRui Paulo out, out_pos, out_size, 75953200025SRui Paulo &unpadded_size, 76053200025SRui Paulo &uncompressed_size); 76153200025SRui Paulo } 76253200025SRui Paulo 76353200025SRui Paulo if (ret == LZMA_STREAM_END) { 76453200025SRui Paulo // End of Block. Add it to the Index. 76553200025SRui Paulo ret = lzma_index_append(coder->index, 76653200025SRui Paulo allocator, unpadded_size, 76753200025SRui Paulo uncompressed_size); 7689e6bbe47SXin LI if (ret != LZMA_OK) { 7699e6bbe47SXin LI threads_stop(coder, false); 7709e6bbe47SXin LI return ret; 7719e6bbe47SXin LI } 77253200025SRui Paulo 77353200025SRui Paulo // If we didn't fill the output buffer yet, 77453200025SRui Paulo // try to read more data. Maybe the next 77553200025SRui Paulo // outbuf has been finished already too. 77653200025SRui Paulo if (*out_pos < out_size) 77753200025SRui Paulo continue; 77853200025SRui Paulo } 77953200025SRui Paulo 78053200025SRui Paulo if (ret != LZMA_OK) { 7819e6bbe47SXin LI // coder->thread_error was set. 78253200025SRui Paulo threads_stop(coder, false); 78353200025SRui Paulo return ret; 78453200025SRui Paulo } 78553200025SRui Paulo 78653200025SRui Paulo // Try to give uncompressed data to a worker thread. 78753200025SRui Paulo ret = stream_encode_in(coder, allocator, 78853200025SRui Paulo in, in_pos, in_size, action); 78953200025SRui Paulo if (ret != LZMA_OK) { 79053200025SRui Paulo threads_stop(coder, false); 79153200025SRui Paulo return ret; 79253200025SRui Paulo } 79353200025SRui Paulo 79453200025SRui Paulo // See if we should wait or return. 79553200025SRui Paulo // 79653200025SRui Paulo // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 79753200025SRui Paulo if (*in_pos == in_size) { 79853200025SRui Paulo // LZMA_RUN: More data is probably coming 79953200025SRui Paulo // so return to let the caller fill the 80053200025SRui Paulo // input buffer. 80153200025SRui Paulo if (action == LZMA_RUN) 80253200025SRui Paulo return LZMA_OK; 80353200025SRui Paulo 80453200025SRui Paulo // LZMA_FULL_BARRIER: The same as with 80553200025SRui Paulo // LZMA_RUN but tell the caller that the 80653200025SRui Paulo // barrier was completed. 80753200025SRui Paulo if (action == LZMA_FULL_BARRIER) 80853200025SRui Paulo return LZMA_STREAM_END; 80953200025SRui Paulo 81053200025SRui Paulo // Finishing or flushing isn't completed until 81153200025SRui Paulo // all input data has been encoded and copied 81253200025SRui Paulo // to the output buffer. 81353200025SRui Paulo if (lzma_outq_is_empty(&coder->outq)) { 81453200025SRui Paulo // LZMA_FINISH: Continue to encode 81553200025SRui Paulo // the Index field. 81653200025SRui Paulo if (action == LZMA_FINISH) 81753200025SRui Paulo break; 81853200025SRui Paulo 81953200025SRui Paulo // LZMA_FULL_FLUSH: Return to tell 82053200025SRui Paulo // the caller that flushing was 82153200025SRui Paulo // completed. 82253200025SRui Paulo if (action == LZMA_FULL_FLUSH) 82353200025SRui Paulo return LZMA_STREAM_END; 82453200025SRui Paulo } 82553200025SRui Paulo } 82653200025SRui Paulo 82753200025SRui Paulo // Return if there is no output space left. 82853200025SRui Paulo // This check must be done after testing the input 82953200025SRui Paulo // buffer, because we might want to use a different 83053200025SRui Paulo // return code. 83153200025SRui Paulo if (*out_pos == out_size) 83253200025SRui Paulo return LZMA_OK; 83353200025SRui Paulo 83453200025SRui Paulo // Neither in nor out has been used completely. 83553200025SRui Paulo // Wait until there's something we can do. 83653200025SRui Paulo if (wait_for_work(coder, &wait_abs, &has_blocked, 83753200025SRui Paulo *in_pos < in_size)) 83853200025SRui Paulo return LZMA_TIMED_OUT; 83953200025SRui Paulo } 84053200025SRui Paulo 84153200025SRui Paulo // All Blocks have been encoded and the threads have stopped. 84253200025SRui Paulo // Prepare to encode the Index field. 84353200025SRui Paulo return_if_error(lzma_index_encoder_init( 84453200025SRui Paulo &coder->index_encoder, allocator, 84553200025SRui Paulo coder->index)); 84653200025SRui Paulo coder->sequence = SEQ_INDEX; 84753200025SRui Paulo 84853200025SRui Paulo // Update the progress info to take the Index and 84953200025SRui Paulo // Stream Footer into account. Those are very fast to encode 85053200025SRui Paulo // so in terms of progress information they can be thought 85153200025SRui Paulo // to be ready to be copied out. 85253200025SRui Paulo coder->progress_out += lzma_index_size(coder->index) 85353200025SRui Paulo + LZMA_STREAM_HEADER_SIZE; 85453200025SRui Paulo } 85553200025SRui Paulo 85653200025SRui Paulo // Fall through 85753200025SRui Paulo 85853200025SRui Paulo case SEQ_INDEX: { 85953200025SRui Paulo // Call the Index encoder. It doesn't take any input, so 86053200025SRui Paulo // those pointers can be NULL. 86153200025SRui Paulo const lzma_ret ret = coder->index_encoder.code( 86253200025SRui Paulo coder->index_encoder.coder, allocator, 86353200025SRui Paulo NULL, NULL, 0, 86453200025SRui Paulo out, out_pos, out_size, LZMA_RUN); 86553200025SRui Paulo if (ret != LZMA_STREAM_END) 86653200025SRui Paulo return ret; 86753200025SRui Paulo 86853200025SRui Paulo // Encode the Stream Footer into coder->buffer. 86953200025SRui Paulo coder->stream_flags.backward_size 87053200025SRui Paulo = lzma_index_size(coder->index); 87153200025SRui Paulo if (lzma_stream_footer_encode(&coder->stream_flags, 87253200025SRui Paulo coder->header) != LZMA_OK) 87353200025SRui Paulo return LZMA_PROG_ERROR; 87453200025SRui Paulo 87553200025SRui Paulo coder->sequence = SEQ_STREAM_FOOTER; 87653200025SRui Paulo } 87753200025SRui Paulo 87853200025SRui Paulo // Fall through 87953200025SRui Paulo 88053200025SRui Paulo case SEQ_STREAM_FOOTER: 88153200025SRui Paulo lzma_bufcpy(coder->header, &coder->header_pos, 88253200025SRui Paulo sizeof(coder->header), 88353200025SRui Paulo out, out_pos, out_size); 88453200025SRui Paulo return coder->header_pos < sizeof(coder->header) 88553200025SRui Paulo ? LZMA_OK : LZMA_STREAM_END; 88653200025SRui Paulo } 88753200025SRui Paulo 88853200025SRui Paulo assert(0); 88953200025SRui Paulo return LZMA_PROG_ERROR; 89053200025SRui Paulo } 89153200025SRui Paulo 89253200025SRui Paulo 89353200025SRui Paulo static void 8941456f0f9SXin LI stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) 89553200025SRui Paulo { 8961456f0f9SXin LI lzma_stream_coder *coder = coder_ptr; 8971456f0f9SXin LI 89853200025SRui Paulo // Threads must be killed before the output queue can be freed. 89953200025SRui Paulo threads_end(coder, allocator); 90053200025SRui Paulo lzma_outq_end(&coder->outq, allocator); 90153200025SRui Paulo 90273ed8e77SXin LI lzma_filters_free(coder->filters, allocator); 90373ed8e77SXin LI lzma_filters_free(coder->filters_cache, allocator); 90453200025SRui Paulo 90553200025SRui Paulo lzma_next_end(&coder->index_encoder, allocator); 90653200025SRui Paulo lzma_index_end(coder->index, allocator); 90753200025SRui Paulo 90853200025SRui Paulo mythread_cond_destroy(&coder->cond); 90953200025SRui Paulo mythread_mutex_destroy(&coder->mutex); 91053200025SRui Paulo 91153200025SRui Paulo lzma_free(coder, allocator); 91253200025SRui Paulo return; 91353200025SRui Paulo } 91453200025SRui Paulo 91553200025SRui Paulo 91673ed8e77SXin LI static lzma_ret 91773ed8e77SXin LI stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator, 91873ed8e77SXin LI const lzma_filter *filters, 91973ed8e77SXin LI const lzma_filter *reversed_filters 92073ed8e77SXin LI lzma_attribute((__unused__))) 92173ed8e77SXin LI { 92273ed8e77SXin LI lzma_stream_coder *coder = coder_ptr; 92373ed8e77SXin LI 92473ed8e77SXin LI // Applications shouldn't attempt to change the options when 92573ed8e77SXin LI // we are already encoding the Index or Stream Footer. 92673ed8e77SXin LI if (coder->sequence > SEQ_BLOCK) 92773ed8e77SXin LI return LZMA_PROG_ERROR; 92873ed8e77SXin LI 92973ed8e77SXin LI // For now the threaded encoder doesn't support changing 93073ed8e77SXin LI // the options in the middle of a Block. 93173ed8e77SXin LI if (coder->thr != NULL) 93273ed8e77SXin LI return LZMA_PROG_ERROR; 93373ed8e77SXin LI 93473ed8e77SXin LI // Check if the filter chain seems mostly valid. See the comment 93573ed8e77SXin LI // in stream_encoder_mt_init(). 93673ed8e77SXin LI if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 93773ed8e77SXin LI return LZMA_OPTIONS_ERROR; 93873ed8e77SXin LI 93973ed8e77SXin LI // Make a copy to a temporary buffer first. This way the encoder 94073ed8e77SXin LI // state stays unchanged if an error occurs in lzma_filters_copy(). 94173ed8e77SXin LI lzma_filter temp[LZMA_FILTERS_MAX + 1]; 94273ed8e77SXin LI return_if_error(lzma_filters_copy(filters, temp, allocator)); 94373ed8e77SXin LI 94473ed8e77SXin LI // Free the options of the old chain as well as the cache. 94573ed8e77SXin LI lzma_filters_free(coder->filters, allocator); 94673ed8e77SXin LI lzma_filters_free(coder->filters_cache, allocator); 94773ed8e77SXin LI 94873ed8e77SXin LI // Copy the new filter chain in place. 94973ed8e77SXin LI memcpy(coder->filters, temp, sizeof(temp)); 95073ed8e77SXin LI 95173ed8e77SXin LI return LZMA_OK; 95273ed8e77SXin LI } 95373ed8e77SXin LI 95473ed8e77SXin LI 95553200025SRui Paulo /// Options handling for lzma_stream_encoder_mt_init() and 95653200025SRui Paulo /// lzma_stream_encoder_mt_memusage() 95753200025SRui Paulo static lzma_ret 95853200025SRui Paulo get_options(const lzma_mt *options, lzma_options_easy *opt_easy, 95953200025SRui Paulo const lzma_filter **filters, uint64_t *block_size, 96053200025SRui Paulo uint64_t *outbuf_size_max) 96153200025SRui Paulo { 96253200025SRui Paulo // Validate some of the options. 96353200025SRui Paulo if (options == NULL) 96453200025SRui Paulo return LZMA_PROG_ERROR; 96553200025SRui Paulo 96653200025SRui Paulo if (options->flags != 0 || options->threads == 0 96753200025SRui Paulo || options->threads > LZMA_THREADS_MAX) 96853200025SRui Paulo return LZMA_OPTIONS_ERROR; 96953200025SRui Paulo 97053200025SRui Paulo if (options->filters != NULL) { 97153200025SRui Paulo // Filter chain was given, use it as is. 97253200025SRui Paulo *filters = options->filters; 97353200025SRui Paulo } else { 97453200025SRui Paulo // Use a preset. 97553200025SRui Paulo if (lzma_easy_preset(opt_easy, options->preset)) 97653200025SRui Paulo return LZMA_OPTIONS_ERROR; 97753200025SRui Paulo 97853200025SRui Paulo *filters = opt_easy->filters; 97953200025SRui Paulo } 98053200025SRui Paulo 9815ffb19acSXin LI // If the Block size is not set, determine it from the filter chain. 9825ffb19acSXin LI if (options->block_size > 0) 9832f9cd13dSXin LI *block_size = options->block_size; 9845ffb19acSXin LI else 9852f9cd13dSXin LI *block_size = lzma_mt_block_size(*filters); 9862f9cd13dSXin LI 9875ffb19acSXin LI // UINT64_MAX > BLOCK_SIZE_MAX, so the second condition 9885ffb19acSXin LI // should be optimized out by any reasonable compiler. 9895ffb19acSXin LI // The second condition should be there in the unlikely event that 9905ffb19acSXin LI // the macros change and UINT64_MAX < BLOCK_SIZE_MAX. 9915ffb19acSXin LI if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX) 9925ffb19acSXin LI return LZMA_OPTIONS_ERROR; 9932f9cd13dSXin LI 99453200025SRui Paulo // Calculate the maximum amount output that a single output buffer 99553200025SRui Paulo // may need to hold. This is the same as the maximum total size of 99653200025SRui Paulo // a Block. 99753200025SRui Paulo *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 99853200025SRui Paulo if (*outbuf_size_max == 0) 99953200025SRui Paulo return LZMA_MEM_ERROR; 100053200025SRui Paulo 100153200025SRui Paulo return LZMA_OK; 100253200025SRui Paulo } 100353200025SRui Paulo 100453200025SRui Paulo 100553200025SRui Paulo static void 10061456f0f9SXin LI get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out) 100753200025SRui Paulo { 10081456f0f9SXin LI lzma_stream_coder *coder = coder_ptr; 10091456f0f9SXin LI 101053200025SRui Paulo // Lock coder->mutex to prevent finishing threads from moving their 10111456f0f9SXin LI // progress info from the worker_thread structure to lzma_stream_coder. 101253200025SRui Paulo mythread_sync(coder->mutex) { 101353200025SRui Paulo *progress_in = coder->progress_in; 101453200025SRui Paulo *progress_out = coder->progress_out; 101553200025SRui Paulo 101653200025SRui Paulo for (size_t i = 0; i < coder->threads_initialized; ++i) { 101753200025SRui Paulo mythread_sync(coder->threads[i].mutex) { 101853200025SRui Paulo *progress_in += coder->threads[i].progress_in; 101953200025SRui Paulo *progress_out += coder->threads[i] 102053200025SRui Paulo .progress_out; 102153200025SRui Paulo } 102253200025SRui Paulo } 102353200025SRui Paulo } 102453200025SRui Paulo 102553200025SRui Paulo return; 102653200025SRui Paulo } 102753200025SRui Paulo 102853200025SRui Paulo 102953200025SRui Paulo static lzma_ret 103053200025SRui Paulo stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 103153200025SRui Paulo const lzma_mt *options) 103253200025SRui Paulo { 103353200025SRui Paulo lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 103453200025SRui Paulo 103553200025SRui Paulo // Get the filter chain. 103653200025SRui Paulo lzma_options_easy easy; 103753200025SRui Paulo const lzma_filter *filters; 103853200025SRui Paulo uint64_t block_size; 103953200025SRui Paulo uint64_t outbuf_size_max; 104053200025SRui Paulo return_if_error(get_options(options, &easy, &filters, 104153200025SRui Paulo &block_size, &outbuf_size_max)); 104253200025SRui Paulo 104353200025SRui Paulo #if SIZE_MAX < UINT64_MAX 104473ed8e77SXin LI if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX) 104553200025SRui Paulo return LZMA_MEM_ERROR; 104653200025SRui Paulo #endif 104753200025SRui Paulo 104853200025SRui Paulo // Validate the filter chain so that we can give an error in this 104953200025SRui Paulo // function instead of delaying it to the first call to lzma_code(). 105053200025SRui Paulo // The memory usage calculation verifies the filter chain as 105173ed8e77SXin LI // a side effect so we take advantage of that. It's not a perfect 105273ed8e77SXin LI // check though as raw encoder allows LZMA1 too but such problems 105373ed8e77SXin LI // will be caught eventually with Block Header encoder. 105453200025SRui Paulo if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 105553200025SRui Paulo return LZMA_OPTIONS_ERROR; 105653200025SRui Paulo 105753200025SRui Paulo // Validate the Check ID. 105853200025SRui Paulo if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 105953200025SRui Paulo return LZMA_PROG_ERROR; 106053200025SRui Paulo 106153200025SRui Paulo if (!lzma_check_is_supported(options->check)) 106253200025SRui Paulo return LZMA_UNSUPPORTED_CHECK; 106353200025SRui Paulo 106453200025SRui Paulo // Allocate and initialize the base structure if needed. 10651456f0f9SXin LI lzma_stream_coder *coder = next->coder; 10661456f0f9SXin LI if (coder == NULL) { 10671456f0f9SXin LI coder = lzma_alloc(sizeof(lzma_stream_coder), allocator); 10681456f0f9SXin LI if (coder == NULL) 106953200025SRui Paulo return LZMA_MEM_ERROR; 107053200025SRui Paulo 10711456f0f9SXin LI next->coder = coder; 10721456f0f9SXin LI 107353200025SRui Paulo // For the mutex and condition variable initializations 107453200025SRui Paulo // the error handling has to be done here because 107553200025SRui Paulo // stream_encoder_mt_end() doesn't know if they have 107653200025SRui Paulo // already been initialized or not. 10771456f0f9SXin LI if (mythread_mutex_init(&coder->mutex)) { 10781456f0f9SXin LI lzma_free(coder, allocator); 107953200025SRui Paulo next->coder = NULL; 108053200025SRui Paulo return LZMA_MEM_ERROR; 108153200025SRui Paulo } 108253200025SRui Paulo 10831456f0f9SXin LI if (mythread_cond_init(&coder->cond)) { 10841456f0f9SXin LI mythread_mutex_destroy(&coder->mutex); 10851456f0f9SXin LI lzma_free(coder, allocator); 108653200025SRui Paulo next->coder = NULL; 108753200025SRui Paulo return LZMA_MEM_ERROR; 108853200025SRui Paulo } 108953200025SRui Paulo 109053200025SRui Paulo next->code = &stream_encode_mt; 109153200025SRui Paulo next->end = &stream_encoder_mt_end; 109253200025SRui Paulo next->get_progress = &get_progress; 109373ed8e77SXin LI next->update = &stream_encoder_mt_update; 109453200025SRui Paulo 10951456f0f9SXin LI coder->filters[0].id = LZMA_VLI_UNKNOWN; 109673ed8e77SXin LI coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; 10971456f0f9SXin LI coder->index_encoder = LZMA_NEXT_CODER_INIT; 10981456f0f9SXin LI coder->index = NULL; 10991456f0f9SXin LI memzero(&coder->outq, sizeof(coder->outq)); 11001456f0f9SXin LI coder->threads = NULL; 11011456f0f9SXin LI coder->threads_max = 0; 11021456f0f9SXin LI coder->threads_initialized = 0; 110353200025SRui Paulo } 110453200025SRui Paulo 110553200025SRui Paulo // Basic initializations 11061456f0f9SXin LI coder->sequence = SEQ_STREAM_HEADER; 11071456f0f9SXin LI coder->block_size = (size_t)(block_size); 110873ed8e77SXin LI coder->outbuf_alloc_size = (size_t)(outbuf_size_max); 11091456f0f9SXin LI coder->thread_error = LZMA_OK; 11101456f0f9SXin LI coder->thr = NULL; 111153200025SRui Paulo 111253200025SRui Paulo // Allocate the thread-specific base structures. 111353200025SRui Paulo assert(options->threads > 0); 11141456f0f9SXin LI if (coder->threads_max != options->threads) { 11151456f0f9SXin LI threads_end(coder, allocator); 111653200025SRui Paulo 11171456f0f9SXin LI coder->threads = NULL; 11181456f0f9SXin LI coder->threads_max = 0; 111953200025SRui Paulo 11201456f0f9SXin LI coder->threads_initialized = 0; 11211456f0f9SXin LI coder->threads_free = NULL; 112253200025SRui Paulo 11231456f0f9SXin LI coder->threads = lzma_alloc( 112453200025SRui Paulo options->threads * sizeof(worker_thread), 112553200025SRui Paulo allocator); 11261456f0f9SXin LI if (coder->threads == NULL) 112753200025SRui Paulo return LZMA_MEM_ERROR; 112853200025SRui Paulo 11291456f0f9SXin LI coder->threads_max = options->threads; 113053200025SRui Paulo } else { 113153200025SRui Paulo // Reuse the old structures and threads. Tell the running 113253200025SRui Paulo // threads to stop and wait until they have stopped. 11331456f0f9SXin LI threads_stop(coder, true); 113453200025SRui Paulo } 113553200025SRui Paulo 113653200025SRui Paulo // Output queue 11371456f0f9SXin LI return_if_error(lzma_outq_init(&coder->outq, allocator, 113873ed8e77SXin LI options->threads)); 113953200025SRui Paulo 114053200025SRui Paulo // Timeout 11411456f0f9SXin LI coder->timeout = options->timeout; 114253200025SRui Paulo 114373ed8e77SXin LI // Free the old filter chain and the cache. 114473ed8e77SXin LI lzma_filters_free(coder->filters, allocator); 114573ed8e77SXin LI lzma_filters_free(coder->filters_cache, allocator); 114653200025SRui Paulo 114773ed8e77SXin LI // Copy the new filter chain. 114853200025SRui Paulo return_if_error(lzma_filters_copy( 11491456f0f9SXin LI filters, coder->filters, allocator)); 115053200025SRui Paulo 115153200025SRui Paulo // Index 11521456f0f9SXin LI lzma_index_end(coder->index, allocator); 11531456f0f9SXin LI coder->index = lzma_index_init(allocator); 11541456f0f9SXin LI if (coder->index == NULL) 115553200025SRui Paulo return LZMA_MEM_ERROR; 115653200025SRui Paulo 115753200025SRui Paulo // Stream Header 11581456f0f9SXin LI coder->stream_flags.version = 0; 11591456f0f9SXin LI coder->stream_flags.check = options->check; 116053200025SRui Paulo return_if_error(lzma_stream_header_encode( 11611456f0f9SXin LI &coder->stream_flags, coder->header)); 116253200025SRui Paulo 11631456f0f9SXin LI coder->header_pos = 0; 116453200025SRui Paulo 116553200025SRui Paulo // Progress info 11661456f0f9SXin LI coder->progress_in = 0; 11671456f0f9SXin LI coder->progress_out = LZMA_STREAM_HEADER_SIZE; 116853200025SRui Paulo 116953200025SRui Paulo return LZMA_OK; 117053200025SRui Paulo } 117153200025SRui Paulo 117253200025SRui Paulo 11739e6bbe47SXin LI #ifdef HAVE_SYMBOL_VERSIONS_LINUX 11749e6bbe47SXin LI // These are for compatibility with binaries linked against liblzma that 11759e6bbe47SXin LI // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7. 11769e6bbe47SXin LI // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2 11779e6bbe47SXin LI // but it has been added here anyway since someone might misread the 11789e6bbe47SXin LI // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist. 11799e6bbe47SXin LI LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha", 11809e6bbe47SXin LI lzma_ret, lzma_stream_encoder_mt_512a)( 11819e6bbe47SXin LI lzma_stream *strm, const lzma_mt *options) 11829e6bbe47SXin LI lzma_nothrow lzma_attr_warn_unused_result 11839e6bbe47SXin LI __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 11849e6bbe47SXin LI 11859e6bbe47SXin LI LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2", 11869e6bbe47SXin LI lzma_ret, lzma_stream_encoder_mt_522)( 11879e6bbe47SXin LI lzma_stream *strm, const lzma_mt *options) 11889e6bbe47SXin LI lzma_nothrow lzma_attr_warn_unused_result 11899e6bbe47SXin LI __attribute__((__alias__("lzma_stream_encoder_mt_52"))); 11909e6bbe47SXin LI 11919e6bbe47SXin LI LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2", 11929e6bbe47SXin LI lzma_ret, lzma_stream_encoder_mt_52)( 11939e6bbe47SXin LI lzma_stream *strm, const lzma_mt *options) 11949e6bbe47SXin LI lzma_nothrow lzma_attr_warn_unused_result; 11959e6bbe47SXin LI 11969e6bbe47SXin LI #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52 11979e6bbe47SXin LI #endif 119853200025SRui Paulo extern LZMA_API(lzma_ret) 119953200025SRui Paulo lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 120053200025SRui Paulo { 120153200025SRui Paulo lzma_next_strm_init(stream_encoder_mt_init, strm, options); 120253200025SRui Paulo 120353200025SRui Paulo strm->internal->supported_actions[LZMA_RUN] = true; 120453200025SRui Paulo // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 120553200025SRui Paulo strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 120653200025SRui Paulo strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 120753200025SRui Paulo strm->internal->supported_actions[LZMA_FINISH] = true; 120853200025SRui Paulo 120953200025SRui Paulo return LZMA_OK; 121053200025SRui Paulo } 121153200025SRui Paulo 121253200025SRui Paulo 12139e6bbe47SXin LI #ifdef HAVE_SYMBOL_VERSIONS_LINUX 12149e6bbe47SXin LI LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha", 12159e6bbe47SXin LI uint64_t, lzma_stream_encoder_mt_memusage_512a)( 12169e6bbe47SXin LI const lzma_mt *options) lzma_nothrow lzma_attr_pure 12179e6bbe47SXin LI __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 12189e6bbe47SXin LI 12199e6bbe47SXin LI LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2", 12209e6bbe47SXin LI uint64_t, lzma_stream_encoder_mt_memusage_522)( 12219e6bbe47SXin LI const lzma_mt *options) lzma_nothrow lzma_attr_pure 12229e6bbe47SXin LI __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52"))); 12239e6bbe47SXin LI 12249e6bbe47SXin LI LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2", 12259e6bbe47SXin LI uint64_t, lzma_stream_encoder_mt_memusage_52)( 12269e6bbe47SXin LI const lzma_mt *options) lzma_nothrow lzma_attr_pure; 12279e6bbe47SXin LI 12289e6bbe47SXin LI #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52 12299e6bbe47SXin LI #endif 123053200025SRui Paulo // This function name is a monster but it's consistent with the older 123153200025SRui Paulo // monster names. :-( 31 chars is the max that C99 requires so in that 123253200025SRui Paulo // sense it's not too long. ;-) 123353200025SRui Paulo extern LZMA_API(uint64_t) 123453200025SRui Paulo lzma_stream_encoder_mt_memusage(const lzma_mt *options) 123553200025SRui Paulo { 123653200025SRui Paulo lzma_options_easy easy; 123753200025SRui Paulo const lzma_filter *filters; 123853200025SRui Paulo uint64_t block_size; 123953200025SRui Paulo uint64_t outbuf_size_max; 124053200025SRui Paulo 124153200025SRui Paulo if (get_options(options, &easy, &filters, &block_size, 124253200025SRui Paulo &outbuf_size_max) != LZMA_OK) 124353200025SRui Paulo return UINT64_MAX; 124453200025SRui Paulo 124553200025SRui Paulo // Memory usage of the input buffers 124653200025SRui Paulo const uint64_t inbuf_memusage = options->threads * block_size; 124753200025SRui Paulo 124853200025SRui Paulo // Memory usage of the filter encoders 124953200025SRui Paulo uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 125053200025SRui Paulo if (filters_memusage == UINT64_MAX) 125153200025SRui Paulo return UINT64_MAX; 125253200025SRui Paulo 125353200025SRui Paulo filters_memusage *= options->threads; 125453200025SRui Paulo 125553200025SRui Paulo // Memory usage of the output queue 125653200025SRui Paulo const uint64_t outq_memusage = lzma_outq_memusage( 125753200025SRui Paulo outbuf_size_max, options->threads); 125853200025SRui Paulo if (outq_memusage == UINT64_MAX) 125953200025SRui Paulo return UINT64_MAX; 126053200025SRui Paulo 126153200025SRui Paulo // Sum them with overflow checking. 12621456f0f9SXin LI uint64_t total_memusage = LZMA_MEMUSAGE_BASE 12631456f0f9SXin LI + sizeof(lzma_stream_coder) 126453200025SRui Paulo + options->threads * sizeof(worker_thread); 126553200025SRui Paulo 126653200025SRui Paulo if (UINT64_MAX - total_memusage < inbuf_memusage) 126753200025SRui Paulo return UINT64_MAX; 126853200025SRui Paulo 126953200025SRui Paulo total_memusage += inbuf_memusage; 127053200025SRui Paulo 127153200025SRui Paulo if (UINT64_MAX - total_memusage < filters_memusage) 127253200025SRui Paulo return UINT64_MAX; 127353200025SRui Paulo 127453200025SRui Paulo total_memusage += filters_memusage; 127553200025SRui Paulo 127653200025SRui Paulo if (UINT64_MAX - total_memusage < outq_memusage) 127753200025SRui Paulo return UINT64_MAX; 127853200025SRui Paulo 127953200025SRui Paulo return total_memusage + outq_memusage; 128053200025SRui Paulo } 1281