xref: /freebsd/contrib/xz/src/liblzma/common/stream_decoder_mt.c (revision ae12432049e7873ab3912643ae5d08297b8cbc49)
1 // SPDX-License-Identifier: 0BSD
2 
3 ///////////////////////////////////////////////////////////////////////////////
4 //
5 /// \file       stream_decoder_mt.c
6 /// \brief      Multithreaded .xz Stream decoder
7 //
8 //  Authors:    Sebastian Andrzej Siewior
9 //              Lasse Collin
10 //
11 ///////////////////////////////////////////////////////////////////////////////
12 
13 #include "common.h"
14 #include "block_decoder.h"
15 #include "stream_decoder.h"
16 #include "index.h"
17 #include "outqueue.h"
18 
19 
20 typedef enum {
21 	/// Waiting for work.
22 	/// Main thread may change this to THR_RUN or THR_EXIT.
23 	THR_IDLE,
24 
25 	/// Decoding is in progress.
26 	/// Main thread may change this to THR_IDLE or THR_EXIT.
27 	/// The worker thread may change this to THR_IDLE.
28 	THR_RUN,
29 
30 	/// The main thread wants the thread to exit.
31 	THR_EXIT,
32 
33 } worker_state;
34 
35 
36 struct worker_thread {
37 	/// Worker state is protected with our mutex.
38 	worker_state state;
39 
40 	/// Input buffer that will contain the whole Block except Block Header.
41 	uint8_t *in;
42 
43 	/// Amount of memory allocated for "in"
44 	size_t in_size;
45 
46 	/// Number of bytes written to "in" by the main thread
47 	size_t in_filled;
48 
49 	/// Number of bytes consumed from "in" by the worker thread.
50 	size_t in_pos;
51 
52 	/// Amount of uncompressed data that has been decoded. This local
53 	/// copy is needed because updating outbuf->pos requires locking
54 	/// the main mutex (coder->mutex).
55 	size_t out_pos;
56 
57 	/// Pointer to the main structure is needed to (1) lock the main
58 	/// mutex (coder->mutex) when updating outbuf->pos and (2) when
59 	/// putting this thread back to the stack of free threads.
60 	struct lzma_stream_coder *coder;
61 
62 	/// The allocator is set by the main thread. Since a copy of the
63 	/// pointer is kept here, the application must not change the
64 	/// allocator before calling lzma_end().
65 	const lzma_allocator *allocator;
66 
67 	/// Output queue buffer to which the uncompressed data is written.
68 	lzma_outbuf *outbuf;
69 
70 	/// Amount of compressed data that has already been decompressed.
71 	/// This is updated from in_pos when our mutex is locked.
72 	/// This is size_t, not uint64_t, because per-thread progress
73 	/// is limited to sizes of allocated buffers.
74 	size_t progress_in;
75 
76 	/// Like progress_in but for uncompressed data.
77 	size_t progress_out;
78 
79 	/// Updating outbuf->pos requires locking the main mutex
80 	/// (coder->mutex). Since the main thread will only read output
81 	/// from the oldest outbuf in the queue, only the worker thread
82 	/// that is associated with the oldest outbuf needs to update its
83 	/// outbuf->pos. This avoids useless mutex contention that would
84 	/// happen if all worker threads were frequently locking the main
85 	/// mutex to update their outbuf->pos.
86 	///
87 	/// When partial_update_enabled is true, this worker thread will
88 	/// update outbuf->pos and outbuf->decoder_in_pos after each call
89 	/// to the Block decoder. This is initially false. Main thread may
90 	/// set this to true.
91 	bool partial_update_enabled;
92 
93 	/// Once the main thread has set partial_updated_enabled = true,
94 	/// we will do the first partial update as soon as we can and
95 	/// set partial_update_started = true. After the first update,
96 	/// we only update if we have made progress. This avoids useless
97 	/// locking of thr->coder->mutex.
98 	bool partial_update_started;
99 
100 	/// Block decoder
101 	lzma_next_coder block_decoder;
102 
103 	/// Thread-specific Block options are needed because the Block
104 	/// decoder modifies the struct given to it at initialization.
105 	lzma_block block_options;
106 
107 	/// Filter chain memory usage
108 	uint64_t mem_filters;
109 
110 	/// Next structure in the stack of free worker threads.
111 	struct worker_thread *next;
112 
113 	mythread_mutex mutex;
114 	mythread_cond cond;
115 
116 	/// The ID of this thread is used to join the thread
117 	/// when it's not needed anymore.
118 	mythread thread_id;
119 };
120 
121 
122 struct lzma_stream_coder {
123 	enum {
124 		SEQ_STREAM_HEADER,
125 		SEQ_BLOCK_HEADER,
126 		SEQ_BLOCK_INIT,
127 		SEQ_BLOCK_THR_INIT,
128 		SEQ_BLOCK_THR_RUN,
129 		SEQ_BLOCK_DIRECT_INIT,
130 		SEQ_BLOCK_DIRECT_RUN,
131 		SEQ_INDEX_WAIT_OUTPUT,
132 		SEQ_INDEX_DECODE,
133 		SEQ_STREAM_FOOTER,
134 		SEQ_STREAM_PADDING,
135 		SEQ_ERROR,
136 	} sequence;
137 
138 	/// Block decoder
139 	lzma_next_coder block_decoder;
140 
141 	/// Every Block Header will be decoded into this structure.
142 	/// This is also used to initialize a Block decoder when in
143 	/// direct mode. In threaded mode, a thread-specific copy will
144 	/// be made for decoder initialization because the Block decoder
145 	/// will modify the structure given to it.
146 	lzma_block block_options;
147 
148 	/// Buffer to hold a filter chain for Block Header decoding and
149 	/// initialization. These are freed after successful Block decoder
150 	/// initialization or at stream_decoder_mt_end(). The thread-specific
151 	/// copy of block_options won't hold a pointer to filters[] after
152 	/// initialization.
153 	lzma_filter filters[LZMA_FILTERS_MAX + 1];
154 
155 	/// Stream Flags from Stream Header
156 	lzma_stream_flags stream_flags;
157 
158 	/// Index is hashed so that it can be compared to the sizes of Blocks
159 	/// with O(1) memory usage.
160 	lzma_index_hash *index_hash;
161 
162 
163 	/// Maximum wait time if cannot use all the input and cannot
164 	/// fill the output buffer. This is in milliseconds.
165 	uint32_t timeout;
166 
167 
168 	/// Error code from a worker thread.
169 	///
170 	/// \note       Use mutex.
171 	lzma_ret thread_error;
172 
173 	/// Error code to return after pending output has been copied out. If
174 	/// set in read_output_and_wait(), this is a mirror of thread_error.
175 	/// If set in stream_decode_mt() then it's, for example, error that
176 	/// occurred when decoding Block Header.
177 	lzma_ret pending_error;
178 
179 	/// Number of threads that will be created at maximum.
180 	uint32_t threads_max;
181 
182 	/// Number of thread structures that have been initialized from
183 	/// "threads", and thus the number of worker threads actually
184 	/// created so far.
185 	uint32_t threads_initialized;
186 
187 	/// Array of allocated thread-specific structures. When no threads
188 	/// are in use (direct mode) this is NULL. In threaded mode this
189 	/// points to an array of threads_max number of worker_thread structs.
190 	struct worker_thread *threads;
191 
192 	/// Stack of free threads. When a thread finishes, it puts itself
193 	/// back into this stack. This starts as empty because threads
194 	/// are created only when actually needed.
195 	///
196 	/// \note       Use mutex.
197 	struct worker_thread *threads_free;
198 
199 	/// The most recent worker thread to which the main thread writes
200 	/// the new input from the application.
201 	struct worker_thread *thr;
202 
203 	/// Output buffer queue for decompressed data from the worker threads
204 	///
205 	/// \note       Use mutex with operations that need it.
206 	lzma_outq outq;
207 
208 	mythread_mutex mutex;
209 	mythread_cond cond;
210 
211 
212 	/// Memory usage that will not be exceeded in multi-threaded mode.
213 	/// Single-threaded mode can exceed this even by a large amount.
214 	uint64_t memlimit_threading;
215 
216 	/// Memory usage limit that should never be exceeded.
217 	/// LZMA_MEMLIMIT_ERROR will be returned if decoding isn't possible
218 	/// even in single-threaded mode without exceeding this limit.
219 	uint64_t memlimit_stop;
220 
221 	/// Amount of memory in use by the direct mode decoder
222 	/// (coder->block_decoder). In threaded mode this is 0.
223 	uint64_t mem_direct_mode;
224 
225 	/// Amount of memory needed by the running worker threads.
226 	/// This doesn't include the memory needed by the output buffer.
227 	///
228 	/// \note       Use mutex.
229 	uint64_t mem_in_use;
230 
231 	/// Amount of memory used by the idle (cached) threads.
232 	///
233 	/// \note       Use mutex.
234 	uint64_t mem_cached;
235 
236 
237 	/// Amount of memory needed for the filter chain of the next Block.
238 	uint64_t mem_next_filters;
239 
240 	/// Amount of memory needed for the thread-specific input buffer
241 	/// for the next Block.
242 	uint64_t mem_next_in;
243 
244 	/// Amount of memory actually needed to decode the next Block
245 	/// in threaded mode. This is
246 	/// mem_next_filters + mem_next_in + memory needed for lzma_outbuf.
247 	uint64_t mem_next_block;
248 
249 
250 	/// Amount of compressed data in Stream Header + Blocks that have
251 	/// already been finished.
252 	///
253 	/// \note       Use mutex.
254 	uint64_t progress_in;
255 
256 	/// Amount of uncompressed data in Blocks that have already
257 	/// been finished.
258 	///
259 	/// \note       Use mutex.
260 	uint64_t progress_out;
261 
262 
263 	/// If true, LZMA_NO_CHECK is returned if the Stream has
264 	/// no integrity check.
265 	bool tell_no_check;
266 
267 	/// If true, LZMA_UNSUPPORTED_CHECK is returned if the Stream has
268 	/// an integrity check that isn't supported by this liblzma build.
269 	bool tell_unsupported_check;
270 
271 	/// If true, LZMA_GET_CHECK is returned after decoding Stream Header.
272 	bool tell_any_check;
273 
274 	/// If true, we will tell the Block decoder to skip calculating
275 	/// and verifying the integrity check.
276 	bool ignore_check;
277 
278 	/// If true, we will decode concatenated Streams that possibly have
279 	/// Stream Padding between or after them. LZMA_STREAM_END is returned
280 	/// once the application isn't giving us any new input (LZMA_FINISH),
281 	/// and we aren't in the middle of a Stream, and possible
282 	/// Stream Padding is a multiple of four bytes.
283 	bool concatenated;
284 
285 	/// If true, we will return any errors immediately instead of first
286 	/// producing all output before the location of the error.
287 	bool fail_fast;
288 
289 
290 	/// When decoding concatenated Streams, this is true as long as we
291 	/// are decoding the first Stream. This is needed to avoid misleading
292 	/// LZMA_FORMAT_ERROR in case the later Streams don't have valid magic
293 	/// bytes.
294 	bool first_stream;
295 
296 	/// This is used to track if the previous call to stream_decode_mt()
297 	/// had output space (*out_pos < out_size) and managed to fill the
298 	/// output buffer (*out_pos == out_size). This may be set to true
299 	/// in read_output_and_wait(). This is read and then reset to false
300 	/// at the beginning of stream_decode_mt().
301 	///
302 	/// This is needed to support applications that call lzma_code() in
303 	/// such a way that more input is provided only when lzma_code()
304 	/// didn't fill the output buffer completely. Basically, this makes
305 	/// it easier to convert such applications from single-threaded
306 	/// decoder to multi-threaded decoder.
307 	bool out_was_filled;
308 
309 	/// Write position in buffer[] and position in Stream Padding
310 	size_t pos;
311 
312 	/// Buffer to hold Stream Header, Block Header, and Stream Footer.
313 	/// Block Header has biggest maximum size.
314 	uint8_t buffer[LZMA_BLOCK_HEADER_SIZE_MAX];
315 };
316 
317 
318 /// Enables updating of outbuf->pos. This is a callback function that is
319 /// used with lzma_outq_enable_partial_output().
320 static void
worker_enable_partial_update(void * thr_ptr)321 worker_enable_partial_update(void *thr_ptr)
322 {
323 	struct worker_thread *thr = thr_ptr;
324 
325 	mythread_sync(thr->mutex) {
326 		thr->partial_update_enabled = true;
327 		mythread_cond_signal(&thr->cond);
328 	}
329 }
330 
331 
332 static MYTHREAD_RET_TYPE
worker_decoder(void * thr_ptr)333 worker_decoder(void *thr_ptr)
334 {
335 	struct worker_thread *thr = thr_ptr;
336 	size_t in_filled;
337 	bool partial_update_enabled;
338 	lzma_ret ret;
339 
340 next_loop_lock:
341 
342 	mythread_mutex_lock(&thr->mutex);
343 next_loop_unlocked:
344 
345 	if (thr->state == THR_IDLE) {
346 		mythread_cond_wait(&thr->cond, &thr->mutex);
347 		goto next_loop_unlocked;
348 	}
349 
350 	if (thr->state == THR_EXIT) {
351 		mythread_mutex_unlock(&thr->mutex);
352 
353 		lzma_free(thr->in, thr->allocator);
354 		lzma_next_end(&thr->block_decoder, thr->allocator);
355 
356 		mythread_mutex_destroy(&thr->mutex);
357 		mythread_cond_destroy(&thr->cond);
358 
359 		return MYTHREAD_RET_VALUE;
360 	}
361 
362 	assert(thr->state == THR_RUN);
363 
364 	// Update progress info for get_progress().
365 	thr->progress_in = thr->in_pos;
366 	thr->progress_out = thr->out_pos;
367 
368 	// If we don't have any new input, wait for a signal from the main
369 	// thread except if partial output has just been enabled
370 	// (partial_update_enabled is true but _started is false). In that
371 	// case we will do one normal run so that the partial output info
372 	// gets passed to the main thread. The call to block_decoder.code()
373 	// is useless but harmless as it can occur only once per Block.
374 	in_filled = thr->in_filled;
375 	partial_update_enabled = thr->partial_update_enabled;
376 
377 	if (in_filled == thr->in_pos && !(partial_update_enabled
378 			&& !thr->partial_update_started)) {
379 		mythread_cond_wait(&thr->cond, &thr->mutex);
380 		goto next_loop_unlocked;
381 	}
382 
383 	mythread_mutex_unlock(&thr->mutex);
384 
385 	// Pass the input in small chunks to the Block decoder.
386 	// This way we react reasonably fast if we are told to stop/exit,
387 	// and (when partial update is enabled) we tell about our progress
388 	// to the main thread frequently enough.
389 	const size_t chunk_size = 16384;
390 	if ((in_filled - thr->in_pos) > chunk_size)
391 		in_filled = thr->in_pos + chunk_size;
392 
393 	ret = thr->block_decoder.code(
394 			thr->block_decoder.coder, thr->allocator,
395 			thr->in, &thr->in_pos, in_filled,
396 			thr->outbuf->buf, &thr->out_pos,
397 			thr->outbuf->allocated, LZMA_RUN);
398 
399 	if (ret == LZMA_OK) {
400 		if (partial_update_enabled) {
401 			// Remember that we have done at least one partial
402 			// update. After the first update we won't do updates
403 			// unless we have made progress.
404 			thr->partial_update_started = true;
405 
406 			// The main thread is reading decompressed data
407 			// from thr->outbuf. Tell the main thread about
408 			// our progress.
409 			//
410 			// NOTE: It's possible that we consumed input without
411 			// producing any new output so it's possible that only
412 			// in_pos has changed. If thr->partial_update_started
413 			// was false, it is possible that neither in_pos nor
414 			// out_pos has changed.
415 			mythread_sync(thr->coder->mutex) {
416 				thr->outbuf->pos = thr->out_pos;
417 				thr->outbuf->decoder_in_pos = thr->in_pos;
418 				mythread_cond_signal(&thr->coder->cond);
419 			}
420 		}
421 
422 		goto next_loop_lock;
423 	}
424 
425 	// Either we finished successfully (LZMA_STREAM_END) or an error
426 	// occurred.
427 	//
428 	// The sizes are in the Block Header and the Block decoder
429 	// checks that they match, thus we know these:
430 	assert(ret != LZMA_STREAM_END || thr->in_pos == thr->in_size);
431 	assert(ret != LZMA_STREAM_END
432 		|| thr->out_pos == thr->block_options.uncompressed_size);
433 
434 	mythread_sync(thr->mutex) {
435 		// Block decoder ensures this, but do a sanity check anyway
436 		// because thr->in_filled < thr->in_size means that the main
437 		// thread is still writing to thr->in.
438 		if (ret == LZMA_STREAM_END && thr->in_filled != thr->in_size) {
439 			assert(0);
440 			ret = LZMA_PROG_ERROR;
441 		}
442 
443 		if (thr->state != THR_EXIT)
444 			thr->state = THR_IDLE;
445 	}
446 
447 	// Free the input buffer. Don't update in_size as we need
448 	// it later to update thr->coder->mem_in_use.
449 	//
450 	// This step is skipped if an error occurred because the main thread
451 	// might still be writing to thr->in. The memory will be freed after
452 	// threads_end() sets thr->state = THR_EXIT.
453 	if (ret == LZMA_STREAM_END) {
454 		lzma_free(thr->in, thr->allocator);
455 		thr->in = NULL;
456 	}
457 
458 	mythread_sync(thr->coder->mutex) {
459 		// Move our progress info to the main thread.
460 		thr->coder->progress_in += thr->in_pos;
461 		thr->coder->progress_out += thr->out_pos;
462 		thr->progress_in = 0;
463 		thr->progress_out = 0;
464 
465 		// Mark the outbuf as finished.
466 		thr->outbuf->pos = thr->out_pos;
467 		thr->outbuf->decoder_in_pos = thr->in_pos;
468 		thr->outbuf->finished = true;
469 		thr->outbuf->finish_ret = ret;
470 		thr->outbuf = NULL;
471 
472 		// If an error occurred, tell it to the main thread.
473 		if (ret != LZMA_STREAM_END
474 				&& thr->coder->thread_error == LZMA_OK)
475 			thr->coder->thread_error = ret;
476 
477 		// Return the worker thread to the stack of available
478 		// threads only if no errors occurred.
479 		if (ret == LZMA_STREAM_END) {
480 			// Update memory usage counters.
481 			thr->coder->mem_in_use -= thr->in_size;
482 			thr->coder->mem_in_use -= thr->mem_filters;
483 			thr->coder->mem_cached += thr->mem_filters;
484 
485 			// Put this thread to the stack of free threads.
486 			thr->next = thr->coder->threads_free;
487 			thr->coder->threads_free = thr;
488 		}
489 
490 		mythread_cond_signal(&thr->coder->cond);
491 	}
492 
493 	goto next_loop_lock;
494 }
495 
496 
497 /// Tells the worker threads to exit and waits for them to terminate.
498 static void
threads_end(struct lzma_stream_coder * coder,const lzma_allocator * allocator)499 threads_end(struct lzma_stream_coder *coder, const lzma_allocator *allocator)
500 {
501 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
502 		mythread_sync(coder->threads[i].mutex) {
503 			coder->threads[i].state = THR_EXIT;
504 			mythread_cond_signal(&coder->threads[i].cond);
505 		}
506 	}
507 
508 	for (uint32_t i = 0; i < coder->threads_initialized; ++i)
509 		mythread_join(coder->threads[i].thread_id);
510 
511 	lzma_free(coder->threads, allocator);
512 	coder->threads_initialized = 0;
513 	coder->threads = NULL;
514 	coder->threads_free = NULL;
515 
516 	// The threads don't update these when they exit. Do it here.
517 	coder->mem_in_use = 0;
518 	coder->mem_cached = 0;
519 
520 	return;
521 }
522 
523 
524 /// Tell worker threads to stop without doing any cleaning up.
525 /// The clean up will be done when threads_exit() is called;
526 /// it's not possible to reuse the threads after threads_stop().
527 ///
528 /// This is called before returning an unrecoverable error code
529 /// to the application. It would be waste of processor time
530 /// to keep the threads running in such a situation.
531 static void
threads_stop(struct lzma_stream_coder * coder)532 threads_stop(struct lzma_stream_coder *coder)
533 {
534 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
535 		// The threads that are in the THR_RUN state will stop
536 		// when they check the state the next time. There's no
537 		// need to signal coder->threads[i].cond.
538 		mythread_sync(coder->threads[i].mutex) {
539 			coder->threads[i].state = THR_IDLE;
540 		}
541 	}
542 
543 	return;
544 }
545 
546 
547 /// Initialize a new worker_thread structure and create a new thread.
548 static lzma_ret
initialize_new_thread(struct lzma_stream_coder * coder,const lzma_allocator * allocator)549 initialize_new_thread(struct lzma_stream_coder *coder,
550 		const lzma_allocator *allocator)
551 {
552 	// Allocate the coder->threads array if needed. It's done here instead
553 	// of when initializing the decoder because we don't need this if we
554 	// use the direct mode (we may even free coder->threads in the middle
555 	// of the file if we switch from threaded to direct mode).
556 	if (coder->threads == NULL) {
557 		coder->threads = lzma_alloc(
558 			coder->threads_max * sizeof(struct worker_thread),
559 			allocator);
560 
561 		if (coder->threads == NULL)
562 			return LZMA_MEM_ERROR;
563 	}
564 
565 	// Pick a free structure.
566 	assert(coder->threads_initialized < coder->threads_max);
567 	struct worker_thread *thr
568 			= &coder->threads[coder->threads_initialized];
569 
570 	if (mythread_mutex_init(&thr->mutex))
571 		goto error_mutex;
572 
573 	if (mythread_cond_init(&thr->cond))
574 		goto error_cond;
575 
576 	thr->state = THR_IDLE;
577 	thr->in = NULL;
578 	thr->in_size = 0;
579 	thr->allocator = allocator;
580 	thr->coder = coder;
581 	thr->outbuf = NULL;
582 	thr->block_decoder = LZMA_NEXT_CODER_INIT;
583 	thr->mem_filters = 0;
584 
585 	if (mythread_create(&thr->thread_id, worker_decoder, thr))
586 		goto error_thread;
587 
588 	++coder->threads_initialized;
589 	coder->thr = thr;
590 
591 	return LZMA_OK;
592 
593 error_thread:
594 	mythread_cond_destroy(&thr->cond);
595 
596 error_cond:
597 	mythread_mutex_destroy(&thr->mutex);
598 
599 error_mutex:
600 	return LZMA_MEM_ERROR;
601 }
602 
603 
604 static lzma_ret
get_thread(struct lzma_stream_coder * coder,const lzma_allocator * allocator)605 get_thread(struct lzma_stream_coder *coder, const lzma_allocator *allocator)
606 {
607 	// If there is a free structure on the stack, use it.
608 	mythread_sync(coder->mutex) {
609 		if (coder->threads_free != NULL) {
610 			coder->thr = coder->threads_free;
611 			coder->threads_free = coder->threads_free->next;
612 
613 			// The thread is no longer in the cache so subtract
614 			// it from the cached memory usage. Don't add it
615 			// to mem_in_use though; the caller will handle it
616 			// since it knows how much memory it will actually
617 			// use (the filter chain might change).
618 			coder->mem_cached -= coder->thr->mem_filters;
619 		}
620 	}
621 
622 	if (coder->thr == NULL) {
623 		assert(coder->threads_initialized < coder->threads_max);
624 
625 		// Initialize a new thread.
626 		return_if_error(initialize_new_thread(coder, allocator));
627 	}
628 
629 	coder->thr->in_filled = 0;
630 	coder->thr->in_pos = 0;
631 	coder->thr->out_pos = 0;
632 
633 	coder->thr->progress_in = 0;
634 	coder->thr->progress_out = 0;
635 
636 	coder->thr->partial_update_enabled = false;
637 	coder->thr->partial_update_started = false;
638 
639 	return LZMA_OK;
640 }
641 
642 
643 static lzma_ret
read_output_and_wait(struct lzma_stream_coder * coder,const lzma_allocator * allocator,uint8_t * restrict out,size_t * restrict out_pos,size_t out_size,bool * input_is_possible,bool waiting_allowed,mythread_condtime * wait_abs,bool * has_blocked)644 read_output_and_wait(struct lzma_stream_coder *coder,
645 		const lzma_allocator *allocator,
646 		uint8_t *restrict out, size_t *restrict out_pos,
647 		size_t out_size,
648 		bool *input_is_possible,
649 		bool waiting_allowed,
650 		mythread_condtime *wait_abs, bool *has_blocked)
651 {
652 	lzma_ret ret = LZMA_OK;
653 
654 	mythread_sync(coder->mutex) {
655 		do {
656 			// Get as much output from the queue as is possible
657 			// without blocking.
658 			const size_t out_start = *out_pos;
659 			do {
660 				ret = lzma_outq_read(&coder->outq, allocator,
661 						out, out_pos, out_size,
662 						NULL, NULL);
663 
664 				// If a Block was finished, tell the worker
665 				// thread of the next Block (if it is still
666 				// running) to start telling the main thread
667 				// when new output is available.
668 				if (ret == LZMA_STREAM_END)
669 					lzma_outq_enable_partial_output(
670 						&coder->outq,
671 						&worker_enable_partial_update);
672 
673 				// Loop until a Block wasn't finished.
674 				// It's important to loop around even if
675 				// *out_pos == out_size because there could
676 				// be an empty Block that will return
677 				// LZMA_STREAM_END without needing any
678 				// output space.
679 			} while (ret == LZMA_STREAM_END);
680 
681 			// Check if lzma_outq_read reported an error from
682 			// the Block decoder.
683 			if (ret != LZMA_OK)
684 				break;
685 
686 			// If the output buffer is now full but it wasn't full
687 			// when this function was called, set out_was_filled.
688 			// This way the next call to stream_decode_mt() knows
689 			// that some output was produced and no output space
690 			// remained in the previous call to stream_decode_mt().
691 			if (*out_pos == out_size && *out_pos != out_start)
692 				coder->out_was_filled = true;
693 
694 			// Check if any thread has indicated an error.
695 			if (coder->thread_error != LZMA_OK) {
696 				// If LZMA_FAIL_FAST was used, report errors
697 				// from worker threads immediately.
698 				if (coder->fail_fast) {
699 					ret = coder->thread_error;
700 					break;
701 				}
702 
703 				// Otherwise set pending_error. The value we
704 				// set here will not actually get used other
705 				// than working as a flag that an error has
706 				// occurred. This is because in SEQ_ERROR
707 				// all output before the error will be read
708 				// first by calling this function, and once we
709 				// reach the location of the (first) error the
710 				// error code from the above lzma_outq_read()
711 				// will be returned to the application.
712 				//
713 				// Use LZMA_PROG_ERROR since the value should
714 				// never leak to the application. It's
715 				// possible that pending_error has already
716 				// been set but that doesn't matter: if we get
717 				// here, pending_error only works as a flag.
718 				coder->pending_error = LZMA_PROG_ERROR;
719 			}
720 
721 			// Check if decoding of the next Block can be started.
722 			// The memusage of the active threads must be low
723 			// enough, there must be a free buffer slot in the
724 			// output queue, and there must be a free thread
725 			// (that can be either created or an existing one
726 			// reused).
727 			//
728 			// NOTE: This is checked after reading the output
729 			// above because reading the output can free a slot in
730 			// the output queue and also reduce active memusage.
731 			//
732 			// NOTE: If output queue is empty, then input will
733 			// always be possible.
734 			if (input_is_possible != NULL
735 					&& coder->memlimit_threading
736 						- coder->mem_in_use
737 						- coder->outq.mem_in_use
738 						>= coder->mem_next_block
739 					&& lzma_outq_has_buf(&coder->outq)
740 					&& (coder->threads_initialized
741 							< coder->threads_max
742 						|| coder->threads_free
743 							!= NULL)) {
744 				*input_is_possible = true;
745 				break;
746 			}
747 
748 			// If the caller doesn't want us to block, return now.
749 			if (!waiting_allowed)
750 				break;
751 
752 			// This check is needed only when input_is_possible
753 			// is NULL. We must return if we aren't waiting for
754 			// input to become possible and there is no more
755 			// output coming from the queue.
756 			if (lzma_outq_is_empty(&coder->outq)) {
757 				assert(input_is_possible == NULL);
758 				break;
759 			}
760 
761 			// If there is more data available from the queue,
762 			// our out buffer must be full and we need to return
763 			// so that the application can provide more output
764 			// space.
765 			//
766 			// NOTE: In general lzma_outq_is_readable() can return
767 			// true also when there are no more bytes available.
768 			// This can happen when a Block has finished without
769 			// providing any new output. We know that this is not
770 			// the case because in the beginning of this loop we
771 			// tried to read as much as possible even when we had
772 			// no output space left and the mutex has been locked
773 			// all the time (so worker threads cannot have changed
774 			// anything). Thus there must be actual pending output
775 			// in the queue.
776 			if (lzma_outq_is_readable(&coder->outq)) {
777 				assert(*out_pos == out_size);
778 				break;
779 			}
780 
781 			// If the application stops providing more input
782 			// in the middle of a Block, there will eventually
783 			// be one worker thread left that is stuck waiting for
784 			// more input (that might never arrive) and a matching
785 			// outbuf which the worker thread cannot finish due
786 			// to lack of input. We must detect this situation,
787 			// otherwise we would end up waiting indefinitely
788 			// (if no timeout is in use) or keep returning
789 			// LZMA_TIMED_OUT while making no progress. Thus, the
790 			// application would never get LZMA_BUF_ERROR from
791 			// lzma_code() which would tell the application that
792 			// no more progress is possible. No LZMA_BUF_ERROR
793 			// means that, for example, truncated .xz files could
794 			// cause an infinite loop.
795 			//
796 			// A worker thread doing partial updates will
797 			// store not only the output position in outbuf->pos
798 			// but also the matching input position in
799 			// outbuf->decoder_in_pos. Here we check if that
800 			// input position matches the amount of input that
801 			// the worker thread has been given (in_filled).
802 			// If so, we must return and not wait as no more
803 			// output will be coming without first getting more
804 			// input to the worker thread. If the application
805 			// keeps calling lzma_code() without providing more
806 			// input, it will eventually get LZMA_BUF_ERROR.
807 			//
808 			// NOTE: We can read partial_update_enabled and
809 			// in_filled without thr->mutex as only the main thread
810 			// modifies these variables. decoder_in_pos requires
811 			// coder->mutex which we are already holding.
812 			if (coder->thr != NULL &&
813 					coder->thr->partial_update_enabled) {
814 				// There is exactly one outbuf in the queue.
815 				assert(coder->thr->outbuf == coder->outq.head);
816 				assert(coder->thr->outbuf == coder->outq.tail);
817 
818 				if (coder->thr->outbuf->decoder_in_pos
819 						== coder->thr->in_filled)
820 					break;
821 			}
822 
823 			// Wait for input or output to become possible.
824 			if (coder->timeout != 0) {
825 				// See the comment in stream_encoder_mt.c
826 				// about why mythread_condtime_set() is used
827 				// like this.
828 				//
829 				// FIXME?
830 				// In contrast to the encoder, this calls
831 				// _condtime_set while the mutex is locked.
832 				if (!*has_blocked) {
833 					*has_blocked = true;
834 					mythread_condtime_set(wait_abs,
835 							&coder->cond,
836 							coder->timeout);
837 				}
838 
839 				if (mythread_cond_timedwait(&coder->cond,
840 						&coder->mutex,
841 						wait_abs) != 0) {
842 					ret = LZMA_TIMED_OUT;
843 					break;
844 				}
845 			} else {
846 				mythread_cond_wait(&coder->cond,
847 						&coder->mutex);
848 			}
849 		} while (ret == LZMA_OK);
850 	}
851 
852 	// If we are returning an error, then the application cannot get
853 	// more output from us and thus keeping the threads running is
854 	// useless and waste of CPU time.
855 	if (ret != LZMA_OK && ret != LZMA_TIMED_OUT)
856 		threads_stop(coder);
857 
858 	return ret;
859 }
860 
861 
862 static lzma_ret
decode_block_header(struct lzma_stream_coder * coder,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size)863 decode_block_header(struct lzma_stream_coder *coder,
864 		const lzma_allocator *allocator, const uint8_t *restrict in,
865 		size_t *restrict in_pos, size_t in_size)
866 {
867 	if (*in_pos >= in_size)
868 		return LZMA_OK;
869 
870 	if (coder->pos == 0) {
871 		// Detect if it's Index.
872 		if (in[*in_pos] == INDEX_INDICATOR)
873 			return LZMA_INDEX_DETECTED;
874 
875 		// Calculate the size of the Block Header. Note that
876 		// Block Header decoder wants to see this byte too
877 		// so don't advance *in_pos.
878 		coder->block_options.header_size
879 				= lzma_block_header_size_decode(
880 					in[*in_pos]);
881 	}
882 
883 	// Copy the Block Header to the internal buffer.
884 	lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
885 			coder->block_options.header_size);
886 
887 	// Return if we didn't get the whole Block Header yet.
888 	if (coder->pos < coder->block_options.header_size)
889 		return LZMA_OK;
890 
891 	coder->pos = 0;
892 
893 	// Version 1 is needed to support the .ignore_check option.
894 	coder->block_options.version = 1;
895 
896 	// Block Header decoder will initialize all members of this array
897 	// so we don't need to do it here.
898 	coder->block_options.filters = coder->filters;
899 
900 	// Decode the Block Header.
901 	return_if_error(lzma_block_header_decode(&coder->block_options,
902 			allocator, coder->buffer));
903 
904 	// If LZMA_IGNORE_CHECK was used, this flag needs to be set.
905 	// It has to be set after lzma_block_header_decode() because
906 	// it always resets this to false.
907 	coder->block_options.ignore_check = coder->ignore_check;
908 
909 	// coder->block_options is ready now.
910 	return LZMA_STREAM_END;
911 }
912 
913 
914 /// Get the size of the Compressed Data + Block Padding + Check.
915 static size_t
comp_blk_size(const struct lzma_stream_coder * coder)916 comp_blk_size(const struct lzma_stream_coder *coder)
917 {
918 	return vli_ceil4(coder->block_options.compressed_size)
919 			+ lzma_check_size(coder->stream_flags.check);
920 }
921 
922 
923 /// Returns true if the size (compressed or uncompressed) is such that
924 /// threaded decompression cannot be used. Sizes that are too big compared
925 /// to SIZE_MAX must be rejected to avoid integer overflows and truncations
926 /// when lzma_vli is assigned to a size_t.
927 static bool
is_direct_mode_needed(lzma_vli size)928 is_direct_mode_needed(lzma_vli size)
929 {
930 	return size == LZMA_VLI_UNKNOWN || size > SIZE_MAX / 3;
931 }
932 
933 
934 static lzma_ret
stream_decoder_reset(struct lzma_stream_coder * coder,const lzma_allocator * allocator)935 stream_decoder_reset(struct lzma_stream_coder *coder,
936 		const lzma_allocator *allocator)
937 {
938 	// Initialize the Index hash used to verify the Index.
939 	coder->index_hash = lzma_index_hash_init(coder->index_hash, allocator);
940 	if (coder->index_hash == NULL)
941 		return LZMA_MEM_ERROR;
942 
943 	// Reset the rest of the variables.
944 	coder->sequence = SEQ_STREAM_HEADER;
945 	coder->pos = 0;
946 
947 	return LZMA_OK;
948 }
949 
950 
951 static lzma_ret
stream_decode_mt(void * coder_ptr,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,uint8_t * restrict out,size_t * restrict out_pos,size_t out_size,lzma_action action)952 stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
953 		 const uint8_t *restrict in, size_t *restrict in_pos,
954 		 size_t in_size,
955 		 uint8_t *restrict out, size_t *restrict out_pos,
956 		 size_t out_size, lzma_action action)
957 {
958 	struct lzma_stream_coder *coder = coder_ptr;
959 
960 	mythread_condtime wait_abs;
961 	bool has_blocked = false;
962 
963 	// Determine if in SEQ_BLOCK_HEADER and SEQ_BLOCK_THR_RUN we should
964 	// tell read_output_and_wait() to wait until it can fill the output
965 	// buffer (or a timeout occurs). Two conditions must be met:
966 	//
967 	// (1) If the caller provided no new input. The reason for this
968 	//     can be, for example, the end of the file or that there is
969 	//     a pause in the input stream and more input is available
970 	//     a little later. In this situation we should wait for output
971 	//     because otherwise we would end up in a busy-waiting loop where
972 	//     we make no progress and the application just calls us again
973 	//     without providing any new input. This would then result in
974 	//     LZMA_BUF_ERROR even though more output would be available
975 	//     once the worker threads decode more data.
976 	//
977 	// (2) Even if (1) is true, we will not wait if the previous call to
978 	//     this function managed to produce some output and the output
979 	//     buffer became full. This is for compatibility with applications
980 	//     that call lzma_code() in such a way that new input is provided
981 	//     only when the output buffer didn't become full. Without this
982 	//     trick such applications would have bad performance (bad
983 	//     parallelization due to decoder not getting input fast enough).
984 	//
985 	//     NOTE: Such loops might require that timeout is disabled (0)
986 	//     if they assume that output-not-full implies that all input has
987 	//     been consumed. If and only if timeout is enabled, we may return
988 	//     when output isn't full *and* not all input has been consumed.
989 	//
990 	// However, if LZMA_FINISH is used, the above is ignored and we always
991 	// wait (timeout can still cause us to return) because we know that
992 	// we won't get any more input. This matters if the input file is
993 	// truncated and we are doing single-shot decoding, that is,
994 	// timeout = 0 and LZMA_FINISH is used on the first call to
995 	// lzma_code() and the output buffer is known to be big enough
996 	// to hold all uncompressed data:
997 	//
998 	//   - If LZMA_FINISH wasn't handled specially, we could return
999 	//     LZMA_OK before providing all output that is possible with the
1000 	//     truncated input. The rest would be available if lzma_code() was
1001 	//     called again but then it's not single-shot decoding anymore.
1002 	//
1003 	//   - By handling LZMA_FINISH specially here, the first call will
1004 	//     produce all the output, matching the behavior of the
1005 	//     single-threaded decoder.
1006 	//
1007 	// So it's a very specific corner case but also easy to avoid. Note
1008 	// that this special handling of LZMA_FINISH has no effect for
1009 	// single-shot decoding when the input file is valid (not truncated);
1010 	// premature LZMA_OK wouldn't be possible as long as timeout = 0.
1011 	const bool waiting_allowed = action == LZMA_FINISH
1012 			|| (*in_pos == in_size && !coder->out_was_filled);
1013 	coder->out_was_filled = false;
1014 
1015 	while (true)
1016 	switch (coder->sequence) {
1017 	case SEQ_STREAM_HEADER: {
1018 		// Copy the Stream Header to the internal buffer.
1019 		const size_t in_old = *in_pos;
1020 		lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
1021 				LZMA_STREAM_HEADER_SIZE);
1022 		coder->progress_in += *in_pos - in_old;
1023 
1024 		// Return if we didn't get the whole Stream Header yet.
1025 		if (coder->pos < LZMA_STREAM_HEADER_SIZE)
1026 			return LZMA_OK;
1027 
1028 		coder->pos = 0;
1029 
1030 		// Decode the Stream Header.
1031 		const lzma_ret ret = lzma_stream_header_decode(
1032 				&coder->stream_flags, coder->buffer);
1033 		if (ret != LZMA_OK)
1034 			return ret == LZMA_FORMAT_ERROR && !coder->first_stream
1035 					? LZMA_DATA_ERROR : ret;
1036 
1037 		// If we are decoding concatenated Streams, and the later
1038 		// Streams have invalid Header Magic Bytes, we give
1039 		// LZMA_DATA_ERROR instead of LZMA_FORMAT_ERROR.
1040 		coder->first_stream = false;
1041 
1042 		// Copy the type of the Check so that Block Header and Block
1043 		// decoders see it.
1044 		coder->block_options.check = coder->stream_flags.check;
1045 
1046 		// Even if we return LZMA_*_CHECK below, we want
1047 		// to continue from Block Header decoding.
1048 		coder->sequence = SEQ_BLOCK_HEADER;
1049 
1050 		// Detect if there's no integrity check or if it is
1051 		// unsupported if those were requested by the application.
1052 		if (coder->tell_no_check && coder->stream_flags.check
1053 				== LZMA_CHECK_NONE)
1054 			return LZMA_NO_CHECK;
1055 
1056 		if (coder->tell_unsupported_check
1057 				&& !lzma_check_is_supported(
1058 					coder->stream_flags.check))
1059 			return LZMA_UNSUPPORTED_CHECK;
1060 
1061 		if (coder->tell_any_check)
1062 			return LZMA_GET_CHECK;
1063 
1064 		FALLTHROUGH;
1065 	}
1066 
1067 	case SEQ_BLOCK_HEADER: {
1068 		const size_t in_old = *in_pos;
1069 		const lzma_ret ret = decode_block_header(coder, allocator,
1070 				in, in_pos, in_size);
1071 		coder->progress_in += *in_pos - in_old;
1072 
1073 		if (ret == LZMA_OK) {
1074 			// We didn't decode the whole Block Header yet.
1075 			//
1076 			// Read output from the queue before returning. This
1077 			// is important because it is possible that the
1078 			// application doesn't have any new input available
1079 			// immediately. If we didn't try to copy output from
1080 			// the output queue here, lzma_code() could end up
1081 			// returning LZMA_BUF_ERROR even though queued output
1082 			// is available.
1083 			//
1084 			// If the lzma_code() call provided at least one input
1085 			// byte, only copy as much data from the output queue
1086 			// as is available immediately. This way the
1087 			// application will be able to provide more input
1088 			// without a delay.
1089 			//
1090 			// On the other hand, if lzma_code() was called with
1091 			// an empty input buffer(*), treat it specially: try
1092 			// to fill the output buffer even if it requires
1093 			// waiting for the worker threads to provide output
1094 			// (timeout, if specified, can still cause us to
1095 			// return).
1096 			//
1097 			//   - This way the application will be able to get all
1098 			//     data that can be decoded from the input provided
1099 			//     so far.
1100 			//
1101 			//   - We avoid both premature LZMA_BUF_ERROR and
1102 			//     busy-waiting where the application repeatedly
1103 			//     calls lzma_code() which immediately returns
1104 			//     LZMA_OK without providing new data.
1105 			//
1106 			//   - If the queue becomes empty, we won't wait
1107 			//     anything and will return LZMA_OK immediately
1108 			//     (coder->timeout is completely ignored).
1109 			//
1110 			// (*) See the comment at the beginning of this
1111 			//     function how waiting_allowed is determined
1112 			//     and why there is an exception to the rule
1113 			//     of "called with an empty input buffer".
1114 			assert(*in_pos == in_size);
1115 
1116 			// If LZMA_FINISH was used we know that we won't get
1117 			// more input, so the file must be truncated if we
1118 			// get here. If worker threads don't detect any
1119 			// errors, eventually there will be no more output
1120 			// while we keep returning LZMA_OK which gets
1121 			// converted to LZMA_BUF_ERROR in lzma_code().
1122 			//
1123 			// If fail-fast is enabled then we will return
1124 			// immediately using LZMA_DATA_ERROR instead of
1125 			// LZMA_OK or LZMA_BUF_ERROR. Rationale for the
1126 			// error code:
1127 			//
1128 			//   - Worker threads may have a large amount of
1129 			//     not-yet-decoded input data and we don't
1130 			//     know for sure if all data is valid. Bad
1131 			//     data there would result in LZMA_DATA_ERROR
1132 			//     when fail-fast isn't used.
1133 			//
1134 			//   - Immediate LZMA_BUF_ERROR would be a bit weird
1135 			//     considering the older liblzma code. lzma_code()
1136 			//     even has an assertion to prevent coders from
1137 			//     returning LZMA_BUF_ERROR directly.
1138 			//
1139 			// The downside of this is that with fail-fast apps
1140 			// cannot always distinguish between corrupt and
1141 			// truncated files.
1142 			if (action == LZMA_FINISH && coder->fail_fast) {
1143 				// We won't produce any more output. Stop
1144 				// the unfinished worker threads so they
1145 				// won't waste CPU time.
1146 				threads_stop(coder);
1147 				return LZMA_DATA_ERROR;
1148 			}
1149 
1150 			// read_output_and_wait() will call threads_stop()
1151 			// if needed so with that we can use return_if_error.
1152 			return_if_error(read_output_and_wait(coder, allocator,
1153 				out, out_pos, out_size,
1154 				NULL, waiting_allowed,
1155 				&wait_abs, &has_blocked));
1156 
1157 			if (coder->pending_error != LZMA_OK) {
1158 				coder->sequence = SEQ_ERROR;
1159 				break;
1160 			}
1161 
1162 			return LZMA_OK;
1163 		}
1164 
1165 		if (ret == LZMA_INDEX_DETECTED) {
1166 			coder->sequence = SEQ_INDEX_WAIT_OUTPUT;
1167 			break;
1168 		}
1169 
1170 		// See if an error occurred.
1171 		if (ret != LZMA_STREAM_END) {
1172 			// NOTE: Here and in all other places where
1173 			// pending_error is set, it may overwrite the value
1174 			// (LZMA_PROG_ERROR) set by read_output_and_wait().
1175 			// That function might overwrite value set here too.
1176 			// These are fine because when read_output_and_wait()
1177 			// sets pending_error, it actually works as a flag
1178 			// variable only ("some error has occurred") and the
1179 			// actual value of pending_error is not used in
1180 			// SEQ_ERROR. In such cases SEQ_ERROR will eventually
1181 			// get the correct error code from the return value of
1182 			// a later read_output_and_wait() call.
1183 			coder->pending_error = ret;
1184 			coder->sequence = SEQ_ERROR;
1185 			break;
1186 		}
1187 
1188 		// Calculate the memory usage of the filters / Block decoder.
1189 		coder->mem_next_filters = lzma_raw_decoder_memusage(
1190 				coder->filters);
1191 
1192 		if (coder->mem_next_filters == UINT64_MAX) {
1193 			// One or more unknown Filter IDs.
1194 			coder->pending_error = LZMA_OPTIONS_ERROR;
1195 			coder->sequence = SEQ_ERROR;
1196 			break;
1197 		}
1198 
1199 		coder->sequence = SEQ_BLOCK_INIT;
1200 		FALLTHROUGH;
1201 	}
1202 
1203 	case SEQ_BLOCK_INIT: {
1204 		// Check if decoding is possible at all with the current
1205 		// memlimit_stop which we must never exceed.
1206 		//
1207 		// This needs to be the first thing in SEQ_BLOCK_INIT
1208 		// to make it possible to restart decoding after increasing
1209 		// memlimit_stop with lzma_memlimit_set().
1210 		if (coder->mem_next_filters > coder->memlimit_stop) {
1211 			// Flush pending output before returning
1212 			// LZMA_MEMLIMIT_ERROR. If the application doesn't
1213 			// want to increase the limit, at least it will get
1214 			// all the output possible so far.
1215 			return_if_error(read_output_and_wait(coder, allocator,
1216 					out, out_pos, out_size,
1217 					NULL, true, &wait_abs, &has_blocked));
1218 
1219 			if (!lzma_outq_is_empty(&coder->outq))
1220 				return LZMA_OK;
1221 
1222 			return LZMA_MEMLIMIT_ERROR;
1223 		}
1224 
1225 		// Check if the size information is available in Block Header.
1226 		// If it is, check if the sizes are small enough that we don't
1227 		// need to worry *too* much about integer overflows later in
1228 		// the code. If these conditions are not met, we must use the
1229 		// single-threaded direct mode.
1230 		if (is_direct_mode_needed(coder->block_options.compressed_size)
1231 				|| is_direct_mode_needed(
1232 				coder->block_options.uncompressed_size)) {
1233 			coder->sequence = SEQ_BLOCK_DIRECT_INIT;
1234 			break;
1235 		}
1236 
1237 		// Calculate the amount of memory needed for the input and
1238 		// output buffers in threaded mode.
1239 		//
1240 		// These cannot overflow because we already checked that
1241 		// the sizes are small enough using is_direct_mode_needed().
1242 		coder->mem_next_in = comp_blk_size(coder);
1243 		const uint64_t mem_buffers = coder->mem_next_in
1244 				+ lzma_outq_outbuf_memusage(
1245 				coder->block_options.uncompressed_size);
1246 
1247 		// Add the amount needed by the filters.
1248 		// Avoid integer overflows.
1249 		if (UINT64_MAX - mem_buffers < coder->mem_next_filters) {
1250 			// Use direct mode if the memusage would overflow.
1251 			// This is a theoretical case that shouldn't happen
1252 			// in practice unless the input file is weird (broken
1253 			// or malicious).
1254 			coder->sequence = SEQ_BLOCK_DIRECT_INIT;
1255 			break;
1256 		}
1257 
1258 		// Amount of memory needed to decode this Block in
1259 		// threaded mode:
1260 		coder->mem_next_block = coder->mem_next_filters + mem_buffers;
1261 
1262 		// If this alone would exceed memlimit_threading, then we must
1263 		// use the single-threaded direct mode.
1264 		if (coder->mem_next_block > coder->memlimit_threading) {
1265 			coder->sequence = SEQ_BLOCK_DIRECT_INIT;
1266 			break;
1267 		}
1268 
1269 		// Use the threaded mode. Free the direct mode decoder in
1270 		// case it has been initialized.
1271 		lzma_next_end(&coder->block_decoder, allocator);
1272 		coder->mem_direct_mode = 0;
1273 
1274 		// Since we already know what the sizes are supposed to be,
1275 		// we can already add them to the Index hash. The Block
1276 		// decoder will verify the values while decoding.
1277 		const lzma_ret ret = lzma_index_hash_append(coder->index_hash,
1278 				lzma_block_unpadded_size(
1279 					&coder->block_options),
1280 				coder->block_options.uncompressed_size);
1281 		if (ret != LZMA_OK) {
1282 			coder->pending_error = ret;
1283 			coder->sequence = SEQ_ERROR;
1284 			break;
1285 		}
1286 
1287 		coder->sequence = SEQ_BLOCK_THR_INIT;
1288 		FALLTHROUGH;
1289 	}
1290 
1291 	case SEQ_BLOCK_THR_INIT: {
1292 		// We need to wait for a multiple conditions to become true
1293 		// until we can initialize the Block decoder and let a worker
1294 		// thread decode it:
1295 		//
1296 		//   - Wait for the memory usage of the active threads to drop
1297 		//     so that starting the decoding of this Block won't make
1298 		//     us go over memlimit_threading.
1299 		//
1300 		//   - Wait for at least one free output queue slot.
1301 		//
1302 		//   - Wait for a free worker thread.
1303 		//
1304 		// While we wait, we must copy decompressed data to the out
1305 		// buffer and catch possible decoder errors.
1306 		//
1307 		// read_output_and_wait() does all the above.
1308 		bool block_can_start = false;
1309 
1310 		return_if_error(read_output_and_wait(coder, allocator,
1311 				out, out_pos, out_size,
1312 				&block_can_start, true,
1313 				&wait_abs, &has_blocked));
1314 
1315 		if (coder->pending_error != LZMA_OK) {
1316 			coder->sequence = SEQ_ERROR;
1317 			break;
1318 		}
1319 
1320 		if (!block_can_start) {
1321 			// It's not a timeout because return_if_error handles
1322 			// it already. Output queue cannot be empty either
1323 			// because in that case block_can_start would have
1324 			// been true. Thus the output buffer must be full and
1325 			// the queue isn't empty.
1326 			assert(*out_pos == out_size);
1327 			assert(!lzma_outq_is_empty(&coder->outq));
1328 			return LZMA_OK;
1329 		}
1330 
1331 		// We know that we can start decoding this Block without
1332 		// exceeding memlimit_threading. However, to stay below
1333 		// memlimit_threading may require freeing some of the
1334 		// cached memory.
1335 		//
1336 		// Get a local copy of variables that require locking the
1337 		// mutex. It is fine if the worker threads modify the real
1338 		// values after we read these as those changes can only be
1339 		// towards more favorable conditions (less memory in use,
1340 		// more in cache).
1341 		//
1342 		// These are initialized to silence warnings.
1343 		uint64_t mem_in_use = 0;
1344 		uint64_t mem_cached = 0;
1345 		struct worker_thread *thr = NULL;
1346 
1347 		mythread_sync(coder->mutex) {
1348 			mem_in_use = coder->mem_in_use;
1349 			mem_cached = coder->mem_cached;
1350 			thr = coder->threads_free;
1351 		}
1352 
1353 		// The maximum amount of memory that can be held by other
1354 		// threads and cached buffers while allowing us to start
1355 		// decoding the next Block.
1356 		const uint64_t mem_max = coder->memlimit_threading
1357 				- coder->mem_next_block;
1358 
1359 		// If the existing allocations are so large that starting
1360 		// to decode this Block might exceed memlimit_threads,
1361 		// try to free memory from the output queue cache first.
1362 		//
1363 		// NOTE: This math assumes the worst case. It's possible
1364 		// that the limit wouldn't be exceeded if the existing cached
1365 		// allocations are reused.
1366 		if (mem_in_use + mem_cached + coder->outq.mem_allocated
1367 				> mem_max) {
1368 			// Clear the outq cache except leave one buffer in
1369 			// the cache if its size is correct. That way we
1370 			// don't free and almost immediately reallocate
1371 			// an identical buffer.
1372 			lzma_outq_clear_cache2(&coder->outq, allocator,
1373 				coder->block_options.uncompressed_size);
1374 		}
1375 
1376 		// If there is at least one worker_thread in the cache and
1377 		// the existing allocations are so large that starting to
1378 		// decode this Block might exceed memlimit_threads, free
1379 		// memory by freeing cached Block decoders.
1380 		//
1381 		// NOTE: The comparison is different here than above.
1382 		// Here we don't care about cached buffers in outq anymore
1383 		// and only look at memory actually in use. This is because
1384 		// if there is something in outq cache, it's a single buffer
1385 		// that can be used as is. We ensured this in the above
1386 		// if-block.
1387 		uint64_t mem_freed = 0;
1388 		if (thr != NULL && mem_in_use + mem_cached
1389 				+ coder->outq.mem_in_use > mem_max) {
1390 			// Don't free the first Block decoder if its memory
1391 			// usage isn't greater than what this Block will need.
1392 			// Typically the same filter chain is used for all
1393 			// Blocks so this way the allocations can be reused
1394 			// when get_thread() picks the first worker_thread
1395 			// from the cache.
1396 			if (thr->mem_filters <= coder->mem_next_filters)
1397 				thr = thr->next;
1398 
1399 			while (thr != NULL) {
1400 				lzma_next_end(&thr->block_decoder, allocator);
1401 				mem_freed += thr->mem_filters;
1402 				thr->mem_filters = 0;
1403 				thr = thr->next;
1404 			}
1405 		}
1406 
1407 		// Update the memory usage counters. Note that coder->mem_*
1408 		// may have changed since we read them so we must subtract
1409 		// or add the changes.
1410 		mythread_sync(coder->mutex) {
1411 			coder->mem_cached -= mem_freed;
1412 
1413 			// Memory needed for the filters and the input buffer.
1414 			// The output queue takes care of its own counter so
1415 			// we don't touch it here.
1416 			//
1417 			// NOTE: After this, coder->mem_in_use +
1418 			// coder->mem_cached might count the same thing twice.
1419 			// If so, this will get corrected in get_thread() when
1420 			// a worker_thread is picked from coder->free_threads
1421 			// and its memory usage is subtracted from mem_cached.
1422 			coder->mem_in_use += coder->mem_next_in
1423 					+ coder->mem_next_filters;
1424 		}
1425 
1426 		// Allocate memory for the output buffer in the output queue.
1427 		lzma_ret ret = lzma_outq_prealloc_buf(
1428 				&coder->outq, allocator,
1429 				coder->block_options.uncompressed_size);
1430 		if (ret != LZMA_OK) {
1431 			threads_stop(coder);
1432 			return ret;
1433 		}
1434 
1435 		// Set up coder->thr.
1436 		ret = get_thread(coder, allocator);
1437 		if (ret != LZMA_OK) {
1438 			threads_stop(coder);
1439 			return ret;
1440 		}
1441 
1442 		// The new Block decoder memory usage is already counted in
1443 		// coder->mem_in_use. Store it in the thread too.
1444 		coder->thr->mem_filters = coder->mem_next_filters;
1445 
1446 		// Initialize the Block decoder.
1447 		coder->thr->block_options = coder->block_options;
1448 		ret = lzma_block_decoder_init(
1449 					&coder->thr->block_decoder, allocator,
1450 					&coder->thr->block_options);
1451 
1452 		// Free the allocated filter options since they are needed
1453 		// only to initialize the Block decoder.
1454 		lzma_filters_free(coder->filters, allocator);
1455 		coder->thr->block_options.filters = NULL;
1456 
1457 		// Check if memory usage calculation and Block encoder
1458 		// initialization succeeded.
1459 		if (ret != LZMA_OK) {
1460 			coder->pending_error = ret;
1461 			coder->sequence = SEQ_ERROR;
1462 			break;
1463 		}
1464 
1465 		// Allocate the input buffer.
1466 		coder->thr->in_size = coder->mem_next_in;
1467 		coder->thr->in = lzma_alloc(coder->thr->in_size, allocator);
1468 		if (coder->thr->in == NULL) {
1469 			threads_stop(coder);
1470 			return LZMA_MEM_ERROR;
1471 		}
1472 
1473 		// Get the preallocated output buffer.
1474 		coder->thr->outbuf = lzma_outq_get_buf(
1475 				&coder->outq, coder->thr);
1476 
1477 		// Start the decoder.
1478 		mythread_sync(coder->thr->mutex) {
1479 			assert(coder->thr->state == THR_IDLE);
1480 			coder->thr->state = THR_RUN;
1481 			mythread_cond_signal(&coder->thr->cond);
1482 		}
1483 
1484 		// Enable output from the thread that holds the oldest output
1485 		// buffer in the output queue (if such a thread exists).
1486 		mythread_sync(coder->mutex) {
1487 			lzma_outq_enable_partial_output(&coder->outq,
1488 					&worker_enable_partial_update);
1489 		}
1490 
1491 		coder->sequence = SEQ_BLOCK_THR_RUN;
1492 		FALLTHROUGH;
1493 	}
1494 
1495 	case SEQ_BLOCK_THR_RUN: {
1496 		if (action == LZMA_FINISH && coder->fail_fast) {
1497 			// We know that we won't get more input and that
1498 			// the caller wants fail-fast behavior. If we see
1499 			// that we don't have enough input to finish this
1500 			// Block, return LZMA_DATA_ERROR immediately.
1501 			// See SEQ_BLOCK_HEADER for the error code rationale.
1502 			const size_t in_avail = in_size - *in_pos;
1503 			const size_t in_needed = coder->thr->in_size
1504 					- coder->thr->in_filled;
1505 			if (in_avail < in_needed) {
1506 				threads_stop(coder);
1507 				return LZMA_DATA_ERROR;
1508 			}
1509 		}
1510 
1511 		// Copy input to the worker thread.
1512 		size_t cur_in_filled = coder->thr->in_filled;
1513 		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
1514 				&cur_in_filled, coder->thr->in_size);
1515 
1516 		// Tell the thread how much we copied.
1517 		mythread_sync(coder->thr->mutex) {
1518 			coder->thr->in_filled = cur_in_filled;
1519 
1520 			// NOTE: Most of the time we are copying input faster
1521 			// than the thread can decode so most of the time
1522 			// calling mythread_cond_signal() is useless but
1523 			// we cannot make it conditional because thr->in_pos
1524 			// is updated without a mutex. And the overhead should
1525 			// be very much negligible anyway.
1526 			mythread_cond_signal(&coder->thr->cond);
1527 		}
1528 
1529 		// Read output from the output queue. Just like in
1530 		// SEQ_BLOCK_HEADER, we wait to fill the output buffer
1531 		// only if waiting_allowed was set to true in the beginning
1532 		// of this function (see the comment there) and there is
1533 		// no input available. In SEQ_BLOCK_HEADER, there is never
1534 		// input available when read_output_and_wait() is called,
1535 		// but here there can be when LZMA_FINISH is used, thus we
1536 		// need to check if *in_pos == in_size. Otherwise we would
1537 		// wait here instead of using the available input to start
1538 		// a new thread.
1539 		return_if_error(read_output_and_wait(coder, allocator,
1540 				out, out_pos, out_size,
1541 				NULL,
1542 				waiting_allowed && *in_pos == in_size,
1543 				&wait_abs, &has_blocked));
1544 
1545 		if (coder->pending_error != LZMA_OK) {
1546 			coder->sequence = SEQ_ERROR;
1547 			break;
1548 		}
1549 
1550 		// Return if the input didn't contain the whole Block.
1551 		//
1552 		// NOTE: When we updated coder->thr->in_filled a few lines
1553 		// above, the worker thread might by now have finished its
1554 		// work and returned itself back to the stack of free threads.
1555 		if (coder->thr->in_filled < coder->thr->in_size) {
1556 			assert(*in_pos == in_size);
1557 			return LZMA_OK;
1558 		}
1559 
1560 		// The whole Block has been copied to the thread-specific
1561 		// buffer. Continue from the next Block Header or Index.
1562 		coder->thr = NULL;
1563 		coder->sequence = SEQ_BLOCK_HEADER;
1564 		break;
1565 	}
1566 
1567 	case SEQ_BLOCK_DIRECT_INIT: {
1568 		// Wait for the threads to finish and that all decoded data
1569 		// has been copied to the output. That is, wait until the
1570 		// output queue becomes empty.
1571 		//
1572 		// NOTE: No need to check for coder->pending_error as
1573 		// we aren't consuming any input until the queue is empty
1574 		// and if there is a pending error, read_output_and_wait()
1575 		// will eventually return it before the queue is empty.
1576 		return_if_error(read_output_and_wait(coder, allocator,
1577 				out, out_pos, out_size,
1578 				NULL, true, &wait_abs, &has_blocked));
1579 		if (!lzma_outq_is_empty(&coder->outq))
1580 			return LZMA_OK;
1581 
1582 		// Free the cached output buffers.
1583 		lzma_outq_clear_cache(&coder->outq, allocator);
1584 
1585 		// Get rid of the worker threads, including the coder->threads
1586 		// array.
1587 		threads_end(coder, allocator);
1588 
1589 		// Initialize the Block decoder.
1590 		const lzma_ret ret = lzma_block_decoder_init(
1591 				&coder->block_decoder, allocator,
1592 				&coder->block_options);
1593 
1594 		// Free the allocated filter options since they are needed
1595 		// only to initialize the Block decoder.
1596 		lzma_filters_free(coder->filters, allocator);
1597 		coder->block_options.filters = NULL;
1598 
1599 		// Check if Block decoder initialization succeeded.
1600 		if (ret != LZMA_OK)
1601 			return ret;
1602 
1603 		// Make the memory usage visible to _memconfig().
1604 		coder->mem_direct_mode = coder->mem_next_filters;
1605 
1606 		coder->sequence = SEQ_BLOCK_DIRECT_RUN;
1607 		FALLTHROUGH;
1608 	}
1609 
1610 	case SEQ_BLOCK_DIRECT_RUN: {
1611 		const size_t in_old = *in_pos;
1612 		const size_t out_old = *out_pos;
1613 		const lzma_ret ret = coder->block_decoder.code(
1614 				coder->block_decoder.coder, allocator,
1615 				in, in_pos, in_size, out, out_pos, out_size,
1616 				action);
1617 		coder->progress_in += *in_pos - in_old;
1618 		coder->progress_out += *out_pos - out_old;
1619 
1620 		if (ret != LZMA_STREAM_END)
1621 			return ret;
1622 
1623 		// Block decoded successfully. Add the new size pair to
1624 		// the Index hash.
1625 		return_if_error(lzma_index_hash_append(coder->index_hash,
1626 				lzma_block_unpadded_size(
1627 					&coder->block_options),
1628 				coder->block_options.uncompressed_size));
1629 
1630 		coder->sequence = SEQ_BLOCK_HEADER;
1631 		break;
1632 	}
1633 
1634 	case SEQ_INDEX_WAIT_OUTPUT:
1635 		// Flush the output from all worker threads so that we can
1636 		// decode the Index without thinking about threading.
1637 		return_if_error(read_output_and_wait(coder, allocator,
1638 				out, out_pos, out_size,
1639 				NULL, true, &wait_abs, &has_blocked));
1640 
1641 		if (!lzma_outq_is_empty(&coder->outq))
1642 			return LZMA_OK;
1643 
1644 		coder->sequence = SEQ_INDEX_DECODE;
1645 		FALLTHROUGH;
1646 
1647 	case SEQ_INDEX_DECODE: {
1648 		// If we don't have any input, don't call
1649 		// lzma_index_hash_decode() since it would return
1650 		// LZMA_BUF_ERROR, which we must not do here.
1651 		if (*in_pos >= in_size)
1652 			return LZMA_OK;
1653 
1654 		// Decode the Index and compare it to the hash calculated
1655 		// from the sizes of the Blocks (if any).
1656 		const size_t in_old = *in_pos;
1657 		const lzma_ret ret = lzma_index_hash_decode(coder->index_hash,
1658 				in, in_pos, in_size);
1659 		coder->progress_in += *in_pos - in_old;
1660 		if (ret != LZMA_STREAM_END)
1661 			return ret;
1662 
1663 		coder->sequence = SEQ_STREAM_FOOTER;
1664 		FALLTHROUGH;
1665 	}
1666 
1667 	case SEQ_STREAM_FOOTER: {
1668 		// Copy the Stream Footer to the internal buffer.
1669 		const size_t in_old = *in_pos;
1670 		lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos,
1671 				LZMA_STREAM_HEADER_SIZE);
1672 		coder->progress_in += *in_pos - in_old;
1673 
1674 		// Return if we didn't get the whole Stream Footer yet.
1675 		if (coder->pos < LZMA_STREAM_HEADER_SIZE)
1676 			return LZMA_OK;
1677 
1678 		coder->pos = 0;
1679 
1680 		// Decode the Stream Footer. The decoder gives
1681 		// LZMA_FORMAT_ERROR if the magic bytes don't match,
1682 		// so convert that return code to LZMA_DATA_ERROR.
1683 		lzma_stream_flags footer_flags;
1684 		const lzma_ret ret = lzma_stream_footer_decode(
1685 				&footer_flags, coder->buffer);
1686 		if (ret != LZMA_OK)
1687 			return ret == LZMA_FORMAT_ERROR
1688 					? LZMA_DATA_ERROR : ret;
1689 
1690 		// Check that Index Size stored in the Stream Footer matches
1691 		// the real size of the Index field.
1692 		if (lzma_index_hash_size(coder->index_hash)
1693 				!= footer_flags.backward_size)
1694 			return LZMA_DATA_ERROR;
1695 
1696 		// Compare that the Stream Flags fields are identical in
1697 		// both Stream Header and Stream Footer.
1698 		return_if_error(lzma_stream_flags_compare(
1699 				&coder->stream_flags, &footer_flags));
1700 
1701 		if (!coder->concatenated)
1702 			return LZMA_STREAM_END;
1703 
1704 		coder->sequence = SEQ_STREAM_PADDING;
1705 		FALLTHROUGH;
1706 	}
1707 
1708 	case SEQ_STREAM_PADDING:
1709 		assert(coder->concatenated);
1710 
1711 		// Skip over possible Stream Padding.
1712 		while (true) {
1713 			if (*in_pos >= in_size) {
1714 				// Unless LZMA_FINISH was used, we cannot
1715 				// know if there's more input coming later.
1716 				if (action != LZMA_FINISH)
1717 					return LZMA_OK;
1718 
1719 				// Stream Padding must be a multiple of
1720 				// four bytes.
1721 				return coder->pos == 0
1722 						? LZMA_STREAM_END
1723 						: LZMA_DATA_ERROR;
1724 			}
1725 
1726 			// If the byte is not zero, it probably indicates
1727 			// beginning of a new Stream (or the file is corrupt).
1728 			if (in[*in_pos] != 0x00)
1729 				break;
1730 
1731 			++*in_pos;
1732 			++coder->progress_in;
1733 			coder->pos = (coder->pos + 1) & 3;
1734 		}
1735 
1736 		// Stream Padding must be a multiple of four bytes (empty
1737 		// Stream Padding is OK).
1738 		if (coder->pos != 0) {
1739 			++*in_pos;
1740 			++coder->progress_in;
1741 			return LZMA_DATA_ERROR;
1742 		}
1743 
1744 		// Prepare to decode the next Stream.
1745 		return_if_error(stream_decoder_reset(coder, allocator));
1746 		break;
1747 
1748 	case SEQ_ERROR:
1749 		if (!coder->fail_fast) {
1750 			// Let the application get all data before the point
1751 			// where the error was detected. This matches the
1752 			// behavior of single-threaded use.
1753 			//
1754 			// FIXME? Some errors (LZMA_MEM_ERROR) don't get here,
1755 			// they are returned immediately. Thus in rare cases
1756 			// the output will be less than in the single-threaded
1757 			// mode. Maybe this doesn't matter much in practice.
1758 			return_if_error(read_output_and_wait(coder, allocator,
1759 					out, out_pos, out_size,
1760 					NULL, true, &wait_abs, &has_blocked));
1761 
1762 			// We get here only if the error happened in the main
1763 			// thread, for example, unsupported Block Header.
1764 			if (!lzma_outq_is_empty(&coder->outq))
1765 				return LZMA_OK;
1766 		}
1767 
1768 		// We only get here if no errors were detected by the worker
1769 		// threads. Errors from worker threads would have already been
1770 		// returned by the call to read_output_and_wait() above.
1771 		return coder->pending_error;
1772 
1773 	default:
1774 		assert(0);
1775 		return LZMA_PROG_ERROR;
1776 	}
1777 
1778 	// Never reached
1779 }
1780 
1781 
1782 static void
stream_decoder_mt_end(void * coder_ptr,const lzma_allocator * allocator)1783 stream_decoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
1784 {
1785 	struct lzma_stream_coder *coder = coder_ptr;
1786 
1787 	threads_end(coder, allocator);
1788 	lzma_outq_end(&coder->outq, allocator);
1789 
1790 	lzma_next_end(&coder->block_decoder, allocator);
1791 	lzma_filters_free(coder->filters, allocator);
1792 	lzma_index_hash_end(coder->index_hash, allocator);
1793 
1794 	lzma_free(coder, allocator);
1795 	return;
1796 }
1797 
1798 
1799 static lzma_check
stream_decoder_mt_get_check(const void * coder_ptr)1800 stream_decoder_mt_get_check(const void *coder_ptr)
1801 {
1802 	const struct lzma_stream_coder *coder = coder_ptr;
1803 	return coder->stream_flags.check;
1804 }
1805 
1806 
1807 static lzma_ret
stream_decoder_mt_memconfig(void * coder_ptr,uint64_t * memusage,uint64_t * old_memlimit,uint64_t new_memlimit)1808 stream_decoder_mt_memconfig(void *coder_ptr, uint64_t *memusage,
1809 		uint64_t *old_memlimit, uint64_t new_memlimit)
1810 {
1811 	// NOTE: This function gets/sets memlimit_stop. For now,
1812 	// memlimit_threading cannot be modified after initialization.
1813 	//
1814 	// *memusage will include cached memory too. Excluding cached memory
1815 	// would be misleading and it wouldn't help the applications to
1816 	// know how much memory is actually needed to decompress the file
1817 	// because the higher the number of threads and the memlimits are
1818 	// the more memory the decoder may use.
1819 	//
1820 	// Setting a new limit includes the cached memory too and too low
1821 	// limits will be rejected. Alternative could be to free the cached
1822 	// memory immediately if that helps to bring the limit down but
1823 	// the current way is the simplest. It's unlikely that limit needs
1824 	// to be lowered in the middle of a file anyway; the typical reason
1825 	// to want a new limit is to increase after LZMA_MEMLIMIT_ERROR
1826 	// and even such use isn't common.
1827 	struct lzma_stream_coder *coder = coder_ptr;
1828 
1829 	mythread_sync(coder->mutex) {
1830 		*memusage = coder->mem_direct_mode
1831 				+ coder->mem_in_use
1832 				+ coder->mem_cached
1833 				+ coder->outq.mem_allocated;
1834 	}
1835 
1836 	// If no filter chains are allocated, *memusage may be zero.
1837 	// Always return at least LZMA_MEMUSAGE_BASE.
1838 	if (*memusage < LZMA_MEMUSAGE_BASE)
1839 		*memusage = LZMA_MEMUSAGE_BASE;
1840 
1841 	*old_memlimit = coder->memlimit_stop;
1842 
1843 	if (new_memlimit != 0) {
1844 		if (new_memlimit < *memusage)
1845 			return LZMA_MEMLIMIT_ERROR;
1846 
1847 		coder->memlimit_stop = new_memlimit;
1848 	}
1849 
1850 	return LZMA_OK;
1851 }
1852 
1853 
1854 static void
stream_decoder_mt_get_progress(void * coder_ptr,uint64_t * progress_in,uint64_t * progress_out)1855 stream_decoder_mt_get_progress(void *coder_ptr,
1856 		uint64_t *progress_in, uint64_t *progress_out)
1857 {
1858 	struct lzma_stream_coder *coder = coder_ptr;
1859 
1860 	// Lock coder->mutex to prevent finishing threads from moving their
1861 	// progress info from the worker_thread structure to lzma_stream_coder.
1862 	mythread_sync(coder->mutex) {
1863 		*progress_in = coder->progress_in;
1864 		*progress_out = coder->progress_out;
1865 
1866 		for (size_t i = 0; i < coder->threads_initialized; ++i) {
1867 			mythread_sync(coder->threads[i].mutex) {
1868 				*progress_in += coder->threads[i].progress_in;
1869 				*progress_out += coder->threads[i]
1870 						.progress_out;
1871 			}
1872 		}
1873 	}
1874 
1875 	return;
1876 }
1877 
1878 
1879 static lzma_ret
stream_decoder_mt_init(lzma_next_coder * next,const lzma_allocator * allocator,const lzma_mt * options)1880 stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
1881 		       const lzma_mt *options)
1882 {
1883 	struct lzma_stream_coder *coder;
1884 
1885 	if (options->threads == 0 || options->threads > LZMA_THREADS_MAX)
1886 		return LZMA_OPTIONS_ERROR;
1887 
1888 	if (options->flags & ~LZMA_SUPPORTED_FLAGS)
1889 		return LZMA_OPTIONS_ERROR;
1890 
1891 	lzma_next_coder_init(&stream_decoder_mt_init, next, allocator);
1892 
1893 	coder = next->coder;
1894 	if (!coder) {
1895 		coder = lzma_alloc(sizeof(struct lzma_stream_coder), allocator);
1896 		if (coder == NULL)
1897 			return LZMA_MEM_ERROR;
1898 
1899 		next->coder = coder;
1900 
1901 		if (mythread_mutex_init(&coder->mutex)) {
1902 			lzma_free(coder, allocator);
1903 			return LZMA_MEM_ERROR;
1904 		}
1905 
1906 		if (mythread_cond_init(&coder->cond)) {
1907 			mythread_mutex_destroy(&coder->mutex);
1908 			lzma_free(coder, allocator);
1909 			return LZMA_MEM_ERROR;
1910 		}
1911 
1912 		next->code = &stream_decode_mt;
1913 		next->end = &stream_decoder_mt_end;
1914 		next->get_check = &stream_decoder_mt_get_check;
1915 		next->memconfig = &stream_decoder_mt_memconfig;
1916 		next->get_progress = &stream_decoder_mt_get_progress;
1917 
1918 		coder->filters[0].id = LZMA_VLI_UNKNOWN;
1919 		memzero(&coder->outq, sizeof(coder->outq));
1920 
1921 		coder->block_decoder = LZMA_NEXT_CODER_INIT;
1922 		coder->mem_direct_mode = 0;
1923 
1924 		coder->index_hash = NULL;
1925 		coder->threads = NULL;
1926 		coder->threads_free = NULL;
1927 		coder->threads_initialized = 0;
1928 	}
1929 
1930 	// Cleanup old filter chain if one remains after unfinished decoding
1931 	// of a previous Stream.
1932 	lzma_filters_free(coder->filters, allocator);
1933 
1934 	// By allocating threads from scratch we can start memory-usage
1935 	// accounting from scratch, too. Changes in filter and block sizes may
1936 	// affect number of threads.
1937 	//
1938 	// Reusing threads doesn't seem worth it. Unlike the single-threaded
1939 	// decoder, with some types of input file combinations reusing
1940 	// could leave quite a lot of memory allocated but unused (first
1941 	// file could allocate a lot, the next files could use fewer
1942 	// threads and some of the allocations from the first file would not
1943 	// get freed unless memlimit_threading forces us to clear caches).
1944 	//
1945 	// NOTE: The direct mode decoder isn't freed here if one exists.
1946 	// It will be reused or freed as needed in the main loop.
1947 	threads_end(coder, allocator);
1948 
1949 	// All memusage counters start at 0 (including mem_direct_mode).
1950 	// The little extra that is needed for the structs in this file
1951 	// get accounted well enough by the filter chain memory usage
1952 	// which adds LZMA_MEMUSAGE_BASE for each chain. However,
1953 	// stream_decoder_mt_memconfig() has to handle this specially so that
1954 	// it will never return less than LZMA_MEMUSAGE_BASE as memory usage.
1955 	coder->mem_in_use = 0;
1956 	coder->mem_cached = 0;
1957 	coder->mem_next_block = 0;
1958 
1959 	coder->progress_in = 0;
1960 	coder->progress_out = 0;
1961 
1962 	coder->sequence = SEQ_STREAM_HEADER;
1963 	coder->thread_error = LZMA_OK;
1964 	coder->pending_error = LZMA_OK;
1965 	coder->thr = NULL;
1966 
1967 	coder->timeout = options->timeout;
1968 
1969 	coder->memlimit_threading = my_max(1, options->memlimit_threading);
1970 	coder->memlimit_stop = my_max(1, options->memlimit_stop);
1971 	if (coder->memlimit_threading > coder->memlimit_stop)
1972 		coder->memlimit_threading = coder->memlimit_stop;
1973 
1974 	coder->tell_no_check = (options->flags & LZMA_TELL_NO_CHECK) != 0;
1975 	coder->tell_unsupported_check
1976 			= (options->flags & LZMA_TELL_UNSUPPORTED_CHECK) != 0;
1977 	coder->tell_any_check = (options->flags & LZMA_TELL_ANY_CHECK) != 0;
1978 	coder->ignore_check = (options->flags & LZMA_IGNORE_CHECK) != 0;
1979 	coder->concatenated = (options->flags & LZMA_CONCATENATED) != 0;
1980 	coder->fail_fast = (options->flags & LZMA_FAIL_FAST) != 0;
1981 
1982 	coder->first_stream = true;
1983 	coder->out_was_filled = false;
1984 	coder->pos = 0;
1985 
1986 	coder->threads_max = options->threads;
1987 
1988 	return_if_error(lzma_outq_init(&coder->outq, allocator,
1989 				       coder->threads_max));
1990 
1991 	return stream_decoder_reset(coder, allocator);
1992 }
1993 
1994 
1995 extern LZMA_API(lzma_ret)
lzma_stream_decoder_mt(lzma_stream * strm,const lzma_mt * options)1996 lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options)
1997 {
1998 	lzma_next_strm_init(stream_decoder_mt_init, strm, options);
1999 
2000 	strm->internal->supported_actions[LZMA_RUN] = true;
2001 	strm->internal->supported_actions[LZMA_FINISH] = true;
2002 
2003 	return LZMA_OK;
2004 }
2005