xref: /freebsd/contrib/xz/src/liblzma/common/stream_encoder_mt.c (revision fee65b7e21eeeb625dcaba8cf2b9e0cf5af83498)
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       stream_encoder_mt.c
4 /// \brief      Multithreaded .xz Stream encoder
5 //
6 //  Author:     Lasse Collin
7 //
8 //  This file has been put into the public domain.
9 //  You can do whatever you want with this file.
10 //
11 ///////////////////////////////////////////////////////////////////////////////
12 
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
18 #include "outqueue.h"
19 
20 
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24 
25 
26 typedef enum {
27 	/// Waiting for work.
28 	THR_IDLE,
29 
30 	/// Encoding is in progress.
31 	THR_RUN,
32 
33 	/// Encoding is in progress but no more input data will
34 	/// be read.
35 	THR_FINISH,
36 
37 	/// The main thread wants the thread to stop whatever it was doing
38 	/// but not exit.
39 	THR_STOP,
40 
41 	/// The main thread wants the thread to exit. We could use
42 	/// cancellation but since there's stopped anyway, this is lazier.
43 	THR_EXIT,
44 
45 } worker_state;
46 
47 typedef struct lzma_stream_coder_s lzma_stream_coder;
48 
49 typedef struct worker_thread_s worker_thread;
50 struct worker_thread_s {
51 	worker_state state;
52 
53 	/// Input buffer of coder->block_size bytes. The main thread will
54 	/// put new input into this and update in_size accordingly. Once
55 	/// no more input is coming, state will be set to THR_FINISH.
56 	uint8_t *in;
57 
58 	/// Amount of data available in the input buffer. This is modified
59 	/// only by the main thread.
60 	size_t in_size;
61 
62 	/// Output buffer for this thread. This is set by the main
63 	/// thread every time a new Block is started with this thread
64 	/// structure.
65 	lzma_outbuf *outbuf;
66 
67 	/// Pointer to the main structure is needed when putting this
68 	/// thread back to the stack of free threads.
69 	lzma_stream_coder *coder;
70 
71 	/// The allocator is set by the main thread. Since a copy of the
72 	/// pointer is kept here, the application must not change the
73 	/// allocator before calling lzma_end().
74 	const lzma_allocator *allocator;
75 
76 	/// Amount of uncompressed data that has already been compressed.
77 	uint64_t progress_in;
78 
79 	/// Amount of compressed data that is ready.
80 	uint64_t progress_out;
81 
82 	/// Block encoder
83 	lzma_next_coder block_encoder;
84 
85 	/// Compression options for this Block
86 	lzma_block block_options;
87 
88 	/// Filter chain for this thread. By copying the filters array
89 	/// to each thread it is possible to change the filter chain
90 	/// between Blocks using lzma_filters_update().
91 	lzma_filter filters[LZMA_FILTERS_MAX + 1];
92 
93 	/// Next structure in the stack of free worker threads.
94 	worker_thread *next;
95 
96 	mythread_mutex mutex;
97 	mythread_cond cond;
98 
99 	/// The ID of this thread is used to join the thread
100 	/// when it's not needed anymore.
101 	mythread thread_id;
102 };
103 
104 
105 struct lzma_stream_coder_s {
106 	enum {
107 		SEQ_STREAM_HEADER,
108 		SEQ_BLOCK,
109 		SEQ_INDEX,
110 		SEQ_STREAM_FOOTER,
111 	} sequence;
112 
113 	/// Start a new Block every block_size bytes of input unless
114 	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
115 	size_t block_size;
116 
117 	/// The filter chain to use for the next Block.
118 	/// This can be updated using lzma_filters_update()
119 	/// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
120 	lzma_filter filters[LZMA_FILTERS_MAX + 1];
121 
122 	/// A copy of filters[] will be put here when attempting to get
123 	/// a new worker thread. This will be copied to a worker thread
124 	/// when a thread becomes free and then this cache is marked as
125 	/// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
126 	/// the filter options from filters[] would get uselessly copied
127 	/// multiple times (allocated and freed) when waiting for a new free
128 	/// worker thread.
129 	///
130 	/// This is freed if filters[] is updated via lzma_filters_update().
131 	lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
132 
133 
134 	/// Index to hold sizes of the Blocks
135 	lzma_index *index;
136 
137 	/// Index encoder
138 	lzma_next_coder index_encoder;
139 
140 
141 	/// Stream Flags for encoding the Stream Header and Stream Footer.
142 	lzma_stream_flags stream_flags;
143 
144 	/// Buffer to hold Stream Header and Stream Footer.
145 	uint8_t header[LZMA_STREAM_HEADER_SIZE];
146 
147 	/// Read position in header[]
148 	size_t header_pos;
149 
150 
151 	/// Output buffer queue for compressed data
152 	lzma_outq outq;
153 
154 	/// How much memory to allocate for each lzma_outbuf.buf
155 	size_t outbuf_alloc_size;
156 
157 
158 	/// Maximum wait time if cannot use all the input and cannot
159 	/// fill the output buffer. This is in milliseconds.
160 	uint32_t timeout;
161 
162 
163 	/// Error code from a worker thread
164 	lzma_ret thread_error;
165 
166 	/// Array of allocated thread-specific structures
167 	worker_thread *threads;
168 
169 	/// Number of structures in "threads" above. This is also the
170 	/// number of threads that will be created at maximum.
171 	uint32_t threads_max;
172 
173 	/// Number of thread structures that have been initialized, and
174 	/// thus the number of worker threads actually created so far.
175 	uint32_t threads_initialized;
176 
177 	/// Stack of free threads. When a thread finishes, it puts itself
178 	/// back into this stack. This starts as empty because threads
179 	/// are created only when actually needed.
180 	worker_thread *threads_free;
181 
182 	/// The most recent worker thread to which the main thread writes
183 	/// the new input from the application.
184 	worker_thread *thr;
185 
186 
187 	/// Amount of uncompressed data in Blocks that have already
188 	/// been finished.
189 	uint64_t progress_in;
190 
191 	/// Amount of compressed data in Stream Header + Blocks that
192 	/// have already been finished.
193 	uint64_t progress_out;
194 
195 
196 	mythread_mutex mutex;
197 	mythread_cond cond;
198 };
199 
200 
201 /// Tell the main thread that something has gone wrong.
202 static void
203 worker_error(worker_thread *thr, lzma_ret ret)
204 {
205 	assert(ret != LZMA_OK);
206 	assert(ret != LZMA_STREAM_END);
207 
208 	mythread_sync(thr->coder->mutex) {
209 		if (thr->coder->thread_error == LZMA_OK)
210 			thr->coder->thread_error = ret;
211 
212 		mythread_cond_signal(&thr->coder->cond);
213 	}
214 
215 	return;
216 }
217 
218 
219 static worker_state
220 worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
221 {
222 	assert(thr->progress_in == 0);
223 	assert(thr->progress_out == 0);
224 
225 	// Set the Block options.
226 	thr->block_options = (lzma_block){
227 		.version = 0,
228 		.check = thr->coder->stream_flags.check,
229 		.compressed_size = thr->outbuf->allocated,
230 		.uncompressed_size = thr->coder->block_size,
231 		.filters = thr->filters,
232 	};
233 
234 	// Calculate maximum size of the Block Header. This amount is
235 	// reserved in the beginning of the buffer so that Block Header
236 	// along with Compressed Size and Uncompressed Size can be
237 	// written there.
238 	lzma_ret ret = lzma_block_header_size(&thr->block_options);
239 	if (ret != LZMA_OK) {
240 		worker_error(thr, ret);
241 		return THR_STOP;
242 	}
243 
244 	// Initialize the Block encoder.
245 	ret = lzma_block_encoder_init(&thr->block_encoder,
246 			thr->allocator, &thr->block_options);
247 	if (ret != LZMA_OK) {
248 		worker_error(thr, ret);
249 		return THR_STOP;
250 	}
251 
252 	size_t in_pos = 0;
253 	size_t in_size = 0;
254 
255 	*out_pos = thr->block_options.header_size;
256 	const size_t out_size = thr->outbuf->allocated;
257 
258 	do {
259 		mythread_sync(thr->mutex) {
260 			// Store in_pos and *out_pos into *thr so that
261 			// an application may read them via
262 			// lzma_get_progress() to get progress information.
263 			//
264 			// NOTE: These aren't updated when the encoding
265 			// finishes. Instead, the final values are taken
266 			// later from thr->outbuf.
267 			thr->progress_in = in_pos;
268 			thr->progress_out = *out_pos;
269 
270 			while (in_size == thr->in_size
271 					&& thr->state == THR_RUN)
272 				mythread_cond_wait(&thr->cond, &thr->mutex);
273 
274 			state = thr->state;
275 			in_size = thr->in_size;
276 		}
277 
278 		// Return if we were asked to stop or exit.
279 		if (state >= THR_STOP)
280 			return state;
281 
282 		lzma_action action = state == THR_FINISH
283 				? LZMA_FINISH : LZMA_RUN;
284 
285 		// Limit the amount of input given to the Block encoder
286 		// at once. This way this thread can react fairly quickly
287 		// if the main thread wants us to stop or exit.
288 		static const size_t in_chunk_max = 16384;
289 		size_t in_limit = in_size;
290 		if (in_size - in_pos > in_chunk_max) {
291 			in_limit = in_pos + in_chunk_max;
292 			action = LZMA_RUN;
293 		}
294 
295 		ret = thr->block_encoder.code(
296 				thr->block_encoder.coder, thr->allocator,
297 				thr->in, &in_pos, in_limit, thr->outbuf->buf,
298 				out_pos, out_size, action);
299 	} while (ret == LZMA_OK && *out_pos < out_size);
300 
301 	switch (ret) {
302 	case LZMA_STREAM_END:
303 		assert(state == THR_FINISH);
304 
305 		// Encode the Block Header. By doing it after
306 		// the compression, we can store the Compressed Size
307 		// and Uncompressed Size fields.
308 		ret = lzma_block_header_encode(&thr->block_options,
309 				thr->outbuf->buf);
310 		if (ret != LZMA_OK) {
311 			worker_error(thr, ret);
312 			return THR_STOP;
313 		}
314 
315 		break;
316 
317 	case LZMA_OK:
318 		// The data was incompressible. Encode it using uncompressed
319 		// LZMA2 chunks.
320 		//
321 		// First wait that we have gotten all the input.
322 		mythread_sync(thr->mutex) {
323 			while (thr->state == THR_RUN)
324 				mythread_cond_wait(&thr->cond, &thr->mutex);
325 
326 			state = thr->state;
327 			in_size = thr->in_size;
328 		}
329 
330 		if (state >= THR_STOP)
331 			return state;
332 
333 		// Do the encoding. This takes care of the Block Header too.
334 		*out_pos = 0;
335 		ret = lzma_block_uncomp_encode(&thr->block_options,
336 				thr->in, in_size, thr->outbuf->buf,
337 				out_pos, out_size);
338 
339 		// It shouldn't fail.
340 		if (ret != LZMA_OK) {
341 			worker_error(thr, LZMA_PROG_ERROR);
342 			return THR_STOP;
343 		}
344 
345 		break;
346 
347 	default:
348 		worker_error(thr, ret);
349 		return THR_STOP;
350 	}
351 
352 	// Set the size information that will be read by the main thread
353 	// to write the Index field.
354 	thr->outbuf->unpadded_size
355 			= lzma_block_unpadded_size(&thr->block_options);
356 	assert(thr->outbuf->unpadded_size != 0);
357 	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
358 
359 	return THR_FINISH;
360 }
361 
362 
363 static MYTHREAD_RET_TYPE
364 worker_start(void *thr_ptr)
365 {
366 	worker_thread *thr = thr_ptr;
367 	worker_state state = THR_IDLE; // Init to silence a warning
368 
369 	while (true) {
370 		// Wait for work.
371 		mythread_sync(thr->mutex) {
372 			while (true) {
373 				// The thread is already idle so if we are
374 				// requested to stop, just set the state.
375 				if (thr->state == THR_STOP) {
376 					thr->state = THR_IDLE;
377 					mythread_cond_signal(&thr->cond);
378 				}
379 
380 				state = thr->state;
381 				if (state != THR_IDLE)
382 					break;
383 
384 				mythread_cond_wait(&thr->cond, &thr->mutex);
385 			}
386 		}
387 
388 		size_t out_pos = 0;
389 
390 		assert(state != THR_IDLE);
391 		assert(state != THR_STOP);
392 
393 		if (state <= THR_FINISH)
394 			state = worker_encode(thr, &out_pos, state);
395 
396 		if (state == THR_EXIT)
397 			break;
398 
399 		// Mark the thread as idle unless the main thread has
400 		// told us to exit. Signal is needed for the case
401 		// where the main thread is waiting for the threads to stop.
402 		mythread_sync(thr->mutex) {
403 			if (thr->state != THR_EXIT) {
404 				thr->state = THR_IDLE;
405 				mythread_cond_signal(&thr->cond);
406 			}
407 		}
408 
409 		mythread_sync(thr->coder->mutex) {
410 			// If no errors occurred, make the encoded data
411 			// available to be copied out.
412 			if (state == THR_FINISH) {
413 				thr->outbuf->pos = out_pos;
414 				thr->outbuf->finished = true;
415 			}
416 
417 			// Update the main progress info.
418 			thr->coder->progress_in
419 					+= thr->outbuf->uncompressed_size;
420 			thr->coder->progress_out += out_pos;
421 			thr->progress_in = 0;
422 			thr->progress_out = 0;
423 
424 			// Return this thread to the stack of free threads.
425 			thr->next = thr->coder->threads_free;
426 			thr->coder->threads_free = thr;
427 
428 			mythread_cond_signal(&thr->coder->cond);
429 		}
430 	}
431 
432 	// Exiting, free the resources.
433 	lzma_filters_free(thr->filters, thr->allocator);
434 
435 	mythread_mutex_destroy(&thr->mutex);
436 	mythread_cond_destroy(&thr->cond);
437 
438 	lzma_next_end(&thr->block_encoder, thr->allocator);
439 	lzma_free(thr->in, thr->allocator);
440 	return MYTHREAD_RET_VALUE;
441 }
442 
443 
444 /// Make the threads stop but not exit. Optionally wait for them to stop.
445 static void
446 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
447 {
448 	// Tell the threads to stop.
449 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
450 		mythread_sync(coder->threads[i].mutex) {
451 			coder->threads[i].state = THR_STOP;
452 			mythread_cond_signal(&coder->threads[i].cond);
453 		}
454 	}
455 
456 	if (!wait_for_threads)
457 		return;
458 
459 	// Wait for the threads to settle in the idle state.
460 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
461 		mythread_sync(coder->threads[i].mutex) {
462 			while (coder->threads[i].state != THR_IDLE)
463 				mythread_cond_wait(&coder->threads[i].cond,
464 						&coder->threads[i].mutex);
465 		}
466 	}
467 
468 	return;
469 }
470 
471 
472 /// Stop the threads and free the resources associated with them.
473 /// Wait until the threads have exited.
474 static void
475 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
476 {
477 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
478 		mythread_sync(coder->threads[i].mutex) {
479 			coder->threads[i].state = THR_EXIT;
480 			mythread_cond_signal(&coder->threads[i].cond);
481 		}
482 	}
483 
484 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
485 		int ret = mythread_join(coder->threads[i].thread_id);
486 		assert(ret == 0);
487 		(void)ret;
488 	}
489 
490 	lzma_free(coder->threads, allocator);
491 	return;
492 }
493 
494 
495 /// Initialize a new worker_thread structure and create a new thread.
496 static lzma_ret
497 initialize_new_thread(lzma_stream_coder *coder,
498 		const lzma_allocator *allocator)
499 {
500 	worker_thread *thr = &coder->threads[coder->threads_initialized];
501 
502 	thr->in = lzma_alloc(coder->block_size, allocator);
503 	if (thr->in == NULL)
504 		return LZMA_MEM_ERROR;
505 
506 	if (mythread_mutex_init(&thr->mutex))
507 		goto error_mutex;
508 
509 	if (mythread_cond_init(&thr->cond))
510 		goto error_cond;
511 
512 	thr->state = THR_IDLE;
513 	thr->allocator = allocator;
514 	thr->coder = coder;
515 	thr->progress_in = 0;
516 	thr->progress_out = 0;
517 	thr->block_encoder = LZMA_NEXT_CODER_INIT;
518 	thr->filters[0].id = LZMA_VLI_UNKNOWN;
519 
520 	if (mythread_create(&thr->thread_id, &worker_start, thr))
521 		goto error_thread;
522 
523 	++coder->threads_initialized;
524 	coder->thr = thr;
525 
526 	return LZMA_OK;
527 
528 error_thread:
529 	mythread_cond_destroy(&thr->cond);
530 
531 error_cond:
532 	mythread_mutex_destroy(&thr->mutex);
533 
534 error_mutex:
535 	lzma_free(thr->in, allocator);
536 	return LZMA_MEM_ERROR;
537 }
538 
539 
540 static lzma_ret
541 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
542 {
543 	// If there are no free output subqueues, there is no
544 	// point to try getting a thread.
545 	if (!lzma_outq_has_buf(&coder->outq))
546 		return LZMA_OK;
547 
548 	// That's also true if we cannot allocate memory for the output
549 	// buffer in the output queue.
550 	return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
551 			coder->outbuf_alloc_size));
552 
553 	// Make a thread-specific copy of the filter chain. Put it in
554 	// the cache array first so that if we cannot get a new thread yet,
555 	// the allocation is ready when we try again.
556 	if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
557 		return_if_error(lzma_filters_copy(
558 			coder->filters, coder->filters_cache, allocator));
559 
560 	// If there is a free structure on the stack, use it.
561 	mythread_sync(coder->mutex) {
562 		if (coder->threads_free != NULL) {
563 			coder->thr = coder->threads_free;
564 			coder->threads_free = coder->threads_free->next;
565 		}
566 	}
567 
568 	if (coder->thr == NULL) {
569 		// If there are no uninitialized structures left, return.
570 		if (coder->threads_initialized == coder->threads_max)
571 			return LZMA_OK;
572 
573 		// Initialize a new thread.
574 		return_if_error(initialize_new_thread(coder, allocator));
575 	}
576 
577 	// Reset the parts of the thread state that have to be done
578 	// in the main thread.
579 	mythread_sync(coder->thr->mutex) {
580 		coder->thr->state = THR_RUN;
581 		coder->thr->in_size = 0;
582 		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
583 
584 		// Free the old thread-specific filter options and replace
585 		// them with the already-allocated new options from
586 		// coder->filters_cache[]. Then mark the cache as empty.
587 		lzma_filters_free(coder->thr->filters, allocator);
588 		memcpy(coder->thr->filters, coder->filters_cache,
589 				sizeof(coder->filters_cache));
590 		coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
591 
592 		mythread_cond_signal(&coder->thr->cond);
593 	}
594 
595 	return LZMA_OK;
596 }
597 
598 
599 static lzma_ret
600 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
601 		const uint8_t *restrict in, size_t *restrict in_pos,
602 		size_t in_size, lzma_action action)
603 {
604 	while (*in_pos < in_size
605 			|| (coder->thr != NULL && action != LZMA_RUN)) {
606 		if (coder->thr == NULL) {
607 			// Get a new thread.
608 			const lzma_ret ret = get_thread(coder, allocator);
609 			if (coder->thr == NULL)
610 				return ret;
611 		}
612 
613 		// Copy the input data to thread's buffer.
614 		size_t thr_in_size = coder->thr->in_size;
615 		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
616 				&thr_in_size, coder->block_size);
617 
618 		// Tell the Block encoder to finish if
619 		//  - it has got block_size bytes of input; or
620 		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
621 		//    or LZMA_FULL_BARRIER was used.
622 		//
623 		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
624 		const bool finish = thr_in_size == coder->block_size
625 				|| (*in_pos == in_size && action != LZMA_RUN);
626 
627 		bool block_error = false;
628 
629 		mythread_sync(coder->thr->mutex) {
630 			if (coder->thr->state == THR_IDLE) {
631 				// Something has gone wrong with the Block
632 				// encoder. It has set coder->thread_error
633 				// which we will read a few lines later.
634 				block_error = true;
635 			} else {
636 				// Tell the Block encoder its new amount
637 				// of input and update the state if needed.
638 				coder->thr->in_size = thr_in_size;
639 
640 				if (finish)
641 					coder->thr->state = THR_FINISH;
642 
643 				mythread_cond_signal(&coder->thr->cond);
644 			}
645 		}
646 
647 		if (block_error) {
648 			lzma_ret ret = LZMA_OK; // Init to silence a warning.
649 
650 			mythread_sync(coder->mutex) {
651 				ret = coder->thread_error;
652 			}
653 
654 			return ret;
655 		}
656 
657 		if (finish)
658 			coder->thr = NULL;
659 	}
660 
661 	return LZMA_OK;
662 }
663 
664 
665 /// Wait until more input can be consumed, more output can be read, or
666 /// an optional timeout is reached.
667 static bool
668 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
669 		bool *has_blocked, bool has_input)
670 {
671 	if (coder->timeout != 0 && !*has_blocked) {
672 		// Every time when stream_encode_mt() is called via
673 		// lzma_code(), *has_blocked starts as false. We set it
674 		// to true here and calculate the absolute time when
675 		// we must return if there's nothing to do.
676 		//
677 		// This way if we block multiple times for short moments
678 		// less than "timeout" milliseconds, we will return once
679 		// "timeout" amount of time has passed since the *first*
680 		// blocking occurred. If the absolute time was calculated
681 		// again every time we block, "timeout" would effectively
682 		// be meaningless if we never consecutively block longer
683 		// than "timeout" ms.
684 		*has_blocked = true;
685 		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
686 	}
687 
688 	bool timed_out = false;
689 
690 	mythread_sync(coder->mutex) {
691 		// There are four things that we wait. If one of them
692 		// becomes possible, we return.
693 		//  - If there is input left, we need to get a free
694 		//    worker thread and an output buffer for it.
695 		//  - Data ready to be read from the output queue.
696 		//  - A worker thread indicates an error.
697 		//  - Time out occurs.
698 		while ((!has_input || coder->threads_free == NULL
699 					|| !lzma_outq_has_buf(&coder->outq))
700 				&& !lzma_outq_is_readable(&coder->outq)
701 				&& coder->thread_error == LZMA_OK
702 				&& !timed_out) {
703 			if (coder->timeout != 0)
704 				timed_out = mythread_cond_timedwait(
705 						&coder->cond, &coder->mutex,
706 						wait_abs) != 0;
707 			else
708 				mythread_cond_wait(&coder->cond,
709 						&coder->mutex);
710 		}
711 	}
712 
713 	return timed_out;
714 }
715 
716 
717 static lzma_ret
718 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
719 		const uint8_t *restrict in, size_t *restrict in_pos,
720 		size_t in_size, uint8_t *restrict out,
721 		size_t *restrict out_pos, size_t out_size, lzma_action action)
722 {
723 	lzma_stream_coder *coder = coder_ptr;
724 
725 	switch (coder->sequence) {
726 	case SEQ_STREAM_HEADER:
727 		lzma_bufcpy(coder->header, &coder->header_pos,
728 				sizeof(coder->header),
729 				out, out_pos, out_size);
730 		if (coder->header_pos < sizeof(coder->header))
731 			return LZMA_OK;
732 
733 		coder->header_pos = 0;
734 		coder->sequence = SEQ_BLOCK;
735 
736 	// Fall through
737 
738 	case SEQ_BLOCK: {
739 		// Initialized to silence warnings.
740 		lzma_vli unpadded_size = 0;
741 		lzma_vli uncompressed_size = 0;
742 		lzma_ret ret = LZMA_OK;
743 
744 		// These are for wait_for_work().
745 		bool has_blocked = false;
746 		mythread_condtime wait_abs;
747 
748 		while (true) {
749 			mythread_sync(coder->mutex) {
750 				// Check for Block encoder errors.
751 				ret = coder->thread_error;
752 				if (ret != LZMA_OK) {
753 					assert(ret != LZMA_STREAM_END);
754 					break; // Break out of mythread_sync.
755 				}
756 
757 				// Try to read compressed data to out[].
758 				ret = lzma_outq_read(&coder->outq, allocator,
759 						out, out_pos, out_size,
760 						&unpadded_size,
761 						&uncompressed_size);
762 			}
763 
764 			if (ret == LZMA_STREAM_END) {
765 				// End of Block. Add it to the Index.
766 				ret = lzma_index_append(coder->index,
767 						allocator, unpadded_size,
768 						uncompressed_size);
769 				if (ret != LZMA_OK) {
770 					threads_stop(coder, false);
771 					return ret;
772 				}
773 
774 				// If we didn't fill the output buffer yet,
775 				// try to read more data. Maybe the next
776 				// outbuf has been finished already too.
777 				if (*out_pos < out_size)
778 					continue;
779 			}
780 
781 			if (ret != LZMA_OK) {
782 				// coder->thread_error was set.
783 				threads_stop(coder, false);
784 				return ret;
785 			}
786 
787 			// Try to give uncompressed data to a worker thread.
788 			ret = stream_encode_in(coder, allocator,
789 					in, in_pos, in_size, action);
790 			if (ret != LZMA_OK) {
791 				threads_stop(coder, false);
792 				return ret;
793 			}
794 
795 			// See if we should wait or return.
796 			//
797 			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
798 			if (*in_pos == in_size) {
799 				// LZMA_RUN: More data is probably coming
800 				// so return to let the caller fill the
801 				// input buffer.
802 				if (action == LZMA_RUN)
803 					return LZMA_OK;
804 
805 				// LZMA_FULL_BARRIER: The same as with
806 				// LZMA_RUN but tell the caller that the
807 				// barrier was completed.
808 				if (action == LZMA_FULL_BARRIER)
809 					return LZMA_STREAM_END;
810 
811 				// Finishing or flushing isn't completed until
812 				// all input data has been encoded and copied
813 				// to the output buffer.
814 				if (lzma_outq_is_empty(&coder->outq)) {
815 					// LZMA_FINISH: Continue to encode
816 					// the Index field.
817 					if (action == LZMA_FINISH)
818 						break;
819 
820 					// LZMA_FULL_FLUSH: Return to tell
821 					// the caller that flushing was
822 					// completed.
823 					if (action == LZMA_FULL_FLUSH)
824 						return LZMA_STREAM_END;
825 				}
826 			}
827 
828 			// Return if there is no output space left.
829 			// This check must be done after testing the input
830 			// buffer, because we might want to use a different
831 			// return code.
832 			if (*out_pos == out_size)
833 				return LZMA_OK;
834 
835 			// Neither in nor out has been used completely.
836 			// Wait until there's something we can do.
837 			if (wait_for_work(coder, &wait_abs, &has_blocked,
838 					*in_pos < in_size))
839 				return LZMA_TIMED_OUT;
840 		}
841 
842 		// All Blocks have been encoded and the threads have stopped.
843 		// Prepare to encode the Index field.
844 		return_if_error(lzma_index_encoder_init(
845 				&coder->index_encoder, allocator,
846 				coder->index));
847 		coder->sequence = SEQ_INDEX;
848 
849 		// Update the progress info to take the Index and
850 		// Stream Footer into account. Those are very fast to encode
851 		// so in terms of progress information they can be thought
852 		// to be ready to be copied out.
853 		coder->progress_out += lzma_index_size(coder->index)
854 				+ LZMA_STREAM_HEADER_SIZE;
855 	}
856 
857 	// Fall through
858 
859 	case SEQ_INDEX: {
860 		// Call the Index encoder. It doesn't take any input, so
861 		// those pointers can be NULL.
862 		const lzma_ret ret = coder->index_encoder.code(
863 				coder->index_encoder.coder, allocator,
864 				NULL, NULL, 0,
865 				out, out_pos, out_size, LZMA_RUN);
866 		if (ret != LZMA_STREAM_END)
867 			return ret;
868 
869 		// Encode the Stream Footer into coder->buffer.
870 		coder->stream_flags.backward_size
871 				= lzma_index_size(coder->index);
872 		if (lzma_stream_footer_encode(&coder->stream_flags,
873 				coder->header) != LZMA_OK)
874 			return LZMA_PROG_ERROR;
875 
876 		coder->sequence = SEQ_STREAM_FOOTER;
877 	}
878 
879 	// Fall through
880 
881 	case SEQ_STREAM_FOOTER:
882 		lzma_bufcpy(coder->header, &coder->header_pos,
883 				sizeof(coder->header),
884 				out, out_pos, out_size);
885 		return coder->header_pos < sizeof(coder->header)
886 				? LZMA_OK : LZMA_STREAM_END;
887 	}
888 
889 	assert(0);
890 	return LZMA_PROG_ERROR;
891 }
892 
893 
894 static void
895 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
896 {
897 	lzma_stream_coder *coder = coder_ptr;
898 
899 	// Threads must be killed before the output queue can be freed.
900 	threads_end(coder, allocator);
901 	lzma_outq_end(&coder->outq, allocator);
902 
903 	lzma_filters_free(coder->filters, allocator);
904 	lzma_filters_free(coder->filters_cache, allocator);
905 
906 	lzma_next_end(&coder->index_encoder, allocator);
907 	lzma_index_end(coder->index, allocator);
908 
909 	mythread_cond_destroy(&coder->cond);
910 	mythread_mutex_destroy(&coder->mutex);
911 
912 	lzma_free(coder, allocator);
913 	return;
914 }
915 
916 
917 static lzma_ret
918 stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
919 		const lzma_filter *filters,
920 		const lzma_filter *reversed_filters
921 			lzma_attribute((__unused__)))
922 {
923 	lzma_stream_coder *coder = coder_ptr;
924 
925 	// Applications shouldn't attempt to change the options when
926 	// we are already encoding the Index or Stream Footer.
927 	if (coder->sequence > SEQ_BLOCK)
928 		return LZMA_PROG_ERROR;
929 
930 	// For now the threaded encoder doesn't support changing
931 	// the options in the middle of a Block.
932 	if (coder->thr != NULL)
933 		return LZMA_PROG_ERROR;
934 
935 	// Check if the filter chain seems mostly valid. See the comment
936 	// in stream_encoder_mt_init().
937 	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
938 		return LZMA_OPTIONS_ERROR;
939 
940 	// Make a copy to a temporary buffer first. This way the encoder
941 	// state stays unchanged if an error occurs in lzma_filters_copy().
942 	lzma_filter temp[LZMA_FILTERS_MAX + 1];
943 	return_if_error(lzma_filters_copy(filters, temp, allocator));
944 
945 	// Free the options of the old chain as well as the cache.
946 	lzma_filters_free(coder->filters, allocator);
947 	lzma_filters_free(coder->filters_cache, allocator);
948 
949 	// Copy the new filter chain in place.
950 	memcpy(coder->filters, temp, sizeof(temp));
951 
952 	return LZMA_OK;
953 }
954 
955 
956 /// Options handling for lzma_stream_encoder_mt_init() and
957 /// lzma_stream_encoder_mt_memusage()
958 static lzma_ret
959 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
960 		const lzma_filter **filters, uint64_t *block_size,
961 		uint64_t *outbuf_size_max)
962 {
963 	// Validate some of the options.
964 	if (options == NULL)
965 		return LZMA_PROG_ERROR;
966 
967 	if (options->flags != 0 || options->threads == 0
968 			|| options->threads > LZMA_THREADS_MAX)
969 		return LZMA_OPTIONS_ERROR;
970 
971 	if (options->filters != NULL) {
972 		// Filter chain was given, use it as is.
973 		*filters = options->filters;
974 	} else {
975 		// Use a preset.
976 		if (lzma_easy_preset(opt_easy, options->preset))
977 			return LZMA_OPTIONS_ERROR;
978 
979 		*filters = opt_easy->filters;
980 	}
981 
982 	// Block size
983 	if (options->block_size > 0) {
984 		if (options->block_size > BLOCK_SIZE_MAX)
985 			return LZMA_OPTIONS_ERROR;
986 
987 		*block_size = options->block_size;
988 	} else {
989 		// Determine the Block size from the filter chain.
990 		*block_size = lzma_mt_block_size(*filters);
991 		if (*block_size == 0)
992 			return LZMA_OPTIONS_ERROR;
993 
994 		assert(*block_size <= BLOCK_SIZE_MAX);
995 	}
996 
997 	// Calculate the maximum amount output that a single output buffer
998 	// may need to hold. This is the same as the maximum total size of
999 	// a Block.
1000 	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
1001 	if (*outbuf_size_max == 0)
1002 		return LZMA_MEM_ERROR;
1003 
1004 	return LZMA_OK;
1005 }
1006 
1007 
1008 static void
1009 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
1010 {
1011 	lzma_stream_coder *coder = coder_ptr;
1012 
1013 	// Lock coder->mutex to prevent finishing threads from moving their
1014 	// progress info from the worker_thread structure to lzma_stream_coder.
1015 	mythread_sync(coder->mutex) {
1016 		*progress_in = coder->progress_in;
1017 		*progress_out = coder->progress_out;
1018 
1019 		for (size_t i = 0; i < coder->threads_initialized; ++i) {
1020 			mythread_sync(coder->threads[i].mutex) {
1021 				*progress_in += coder->threads[i].progress_in;
1022 				*progress_out += coder->threads[i]
1023 						.progress_out;
1024 			}
1025 		}
1026 	}
1027 
1028 	return;
1029 }
1030 
1031 
1032 static lzma_ret
1033 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
1034 		const lzma_mt *options)
1035 {
1036 	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
1037 
1038 	// Get the filter chain.
1039 	lzma_options_easy easy;
1040 	const lzma_filter *filters;
1041 	uint64_t block_size;
1042 	uint64_t outbuf_size_max;
1043 	return_if_error(get_options(options, &easy, &filters,
1044 			&block_size, &outbuf_size_max));
1045 
1046 #if SIZE_MAX < UINT64_MAX
1047 	if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
1048 		return LZMA_MEM_ERROR;
1049 #endif
1050 
1051 	// Validate the filter chain so that we can give an error in this
1052 	// function instead of delaying it to the first call to lzma_code().
1053 	// The memory usage calculation verifies the filter chain as
1054 	// a side effect so we take advantage of that. It's not a perfect
1055 	// check though as raw encoder allows LZMA1 too but such problems
1056 	// will be caught eventually with Block Header encoder.
1057 	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
1058 		return LZMA_OPTIONS_ERROR;
1059 
1060 	// Validate the Check ID.
1061 	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
1062 		return LZMA_PROG_ERROR;
1063 
1064 	if (!lzma_check_is_supported(options->check))
1065 		return LZMA_UNSUPPORTED_CHECK;
1066 
1067 	// Allocate and initialize the base structure if needed.
1068 	lzma_stream_coder *coder = next->coder;
1069 	if (coder == NULL) {
1070 		coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
1071 		if (coder == NULL)
1072 			return LZMA_MEM_ERROR;
1073 
1074 		next->coder = coder;
1075 
1076 		// For the mutex and condition variable initializations
1077 		// the error handling has to be done here because
1078 		// stream_encoder_mt_end() doesn't know if they have
1079 		// already been initialized or not.
1080 		if (mythread_mutex_init(&coder->mutex)) {
1081 			lzma_free(coder, allocator);
1082 			next->coder = NULL;
1083 			return LZMA_MEM_ERROR;
1084 		}
1085 
1086 		if (mythread_cond_init(&coder->cond)) {
1087 			mythread_mutex_destroy(&coder->mutex);
1088 			lzma_free(coder, allocator);
1089 			next->coder = NULL;
1090 			return LZMA_MEM_ERROR;
1091 		}
1092 
1093 		next->code = &stream_encode_mt;
1094 		next->end = &stream_encoder_mt_end;
1095 		next->get_progress = &get_progress;
1096 		next->update = &stream_encoder_mt_update;
1097 
1098 		coder->filters[0].id = LZMA_VLI_UNKNOWN;
1099 		coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
1100 		coder->index_encoder = LZMA_NEXT_CODER_INIT;
1101 		coder->index = NULL;
1102 		memzero(&coder->outq, sizeof(coder->outq));
1103 		coder->threads = NULL;
1104 		coder->threads_max = 0;
1105 		coder->threads_initialized = 0;
1106 	}
1107 
1108 	// Basic initializations
1109 	coder->sequence = SEQ_STREAM_HEADER;
1110 	coder->block_size = (size_t)(block_size);
1111 	coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
1112 	coder->thread_error = LZMA_OK;
1113 	coder->thr = NULL;
1114 
1115 	// Allocate the thread-specific base structures.
1116 	assert(options->threads > 0);
1117 	if (coder->threads_max != options->threads) {
1118 		threads_end(coder, allocator);
1119 
1120 		coder->threads = NULL;
1121 		coder->threads_max = 0;
1122 
1123 		coder->threads_initialized = 0;
1124 		coder->threads_free = NULL;
1125 
1126 		coder->threads = lzma_alloc(
1127 				options->threads * sizeof(worker_thread),
1128 				allocator);
1129 		if (coder->threads == NULL)
1130 			return LZMA_MEM_ERROR;
1131 
1132 		coder->threads_max = options->threads;
1133 	} else {
1134 		// Reuse the old structures and threads. Tell the running
1135 		// threads to stop and wait until they have stopped.
1136 		threads_stop(coder, true);
1137 	}
1138 
1139 	// Output queue
1140 	return_if_error(lzma_outq_init(&coder->outq, allocator,
1141 			options->threads));
1142 
1143 	// Timeout
1144 	coder->timeout = options->timeout;
1145 
1146 	// Free the old filter chain and the cache.
1147 	lzma_filters_free(coder->filters, allocator);
1148 	lzma_filters_free(coder->filters_cache, allocator);
1149 
1150 	// Copy the new filter chain.
1151 	return_if_error(lzma_filters_copy(
1152 			filters, coder->filters, allocator));
1153 
1154 	// Index
1155 	lzma_index_end(coder->index, allocator);
1156 	coder->index = lzma_index_init(allocator);
1157 	if (coder->index == NULL)
1158 		return LZMA_MEM_ERROR;
1159 
1160 	// Stream Header
1161 	coder->stream_flags.version = 0;
1162 	coder->stream_flags.check = options->check;
1163 	return_if_error(lzma_stream_header_encode(
1164 			&coder->stream_flags, coder->header));
1165 
1166 	coder->header_pos = 0;
1167 
1168 	// Progress info
1169 	coder->progress_in = 0;
1170 	coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1171 
1172 	return LZMA_OK;
1173 }
1174 
1175 
1176 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1177 // These are for compatibility with binaries linked against liblzma that
1178 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1179 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1180 // but it has been added here anyway since someone might misread the
1181 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1182 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1183 	lzma_ret, lzma_stream_encoder_mt_512a)(
1184 		lzma_stream *strm, const lzma_mt *options)
1185 		lzma_nothrow lzma_attr_warn_unused_result
1186 		__attribute__((__alias__("lzma_stream_encoder_mt_52")));
1187 
1188 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1189 	lzma_ret, lzma_stream_encoder_mt_522)(
1190 		lzma_stream *strm, const lzma_mt *options)
1191 		lzma_nothrow lzma_attr_warn_unused_result
1192 		__attribute__((__alias__("lzma_stream_encoder_mt_52")));
1193 
1194 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1195 	lzma_ret, lzma_stream_encoder_mt_52)(
1196 		lzma_stream *strm, const lzma_mt *options)
1197 		lzma_nothrow lzma_attr_warn_unused_result;
1198 
1199 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1200 #endif
1201 extern LZMA_API(lzma_ret)
1202 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1203 {
1204 	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1205 
1206 	strm->internal->supported_actions[LZMA_RUN] = true;
1207 // 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1208 	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1209 	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1210 	strm->internal->supported_actions[LZMA_FINISH] = true;
1211 
1212 	return LZMA_OK;
1213 }
1214 
1215 
1216 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1217 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1218 	uint64_t, lzma_stream_encoder_mt_memusage_512a)(
1219 	const lzma_mt *options) lzma_nothrow lzma_attr_pure
1220 	__attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1221 
1222 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1223 	uint64_t, lzma_stream_encoder_mt_memusage_522)(
1224 	const lzma_mt *options) lzma_nothrow lzma_attr_pure
1225 	__attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1226 
1227 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1228 	uint64_t, lzma_stream_encoder_mt_memusage_52)(
1229 	const lzma_mt *options) lzma_nothrow lzma_attr_pure;
1230 
1231 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1232 #endif
1233 // This function name is a monster but it's consistent with the older
1234 // monster names. :-( 31 chars is the max that C99 requires so in that
1235 // sense it's not too long. ;-)
1236 extern LZMA_API(uint64_t)
1237 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1238 {
1239 	lzma_options_easy easy;
1240 	const lzma_filter *filters;
1241 	uint64_t block_size;
1242 	uint64_t outbuf_size_max;
1243 
1244 	if (get_options(options, &easy, &filters, &block_size,
1245 			&outbuf_size_max) != LZMA_OK)
1246 		return UINT64_MAX;
1247 
1248 	// Memory usage of the input buffers
1249 	const uint64_t inbuf_memusage = options->threads * block_size;
1250 
1251 	// Memory usage of the filter encoders
1252 	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1253 	if (filters_memusage == UINT64_MAX)
1254 		return UINT64_MAX;
1255 
1256 	filters_memusage *= options->threads;
1257 
1258 	// Memory usage of the output queue
1259 	const uint64_t outq_memusage = lzma_outq_memusage(
1260 			outbuf_size_max, options->threads);
1261 	if (outq_memusage == UINT64_MAX)
1262 		return UINT64_MAX;
1263 
1264 	// Sum them with overflow checking.
1265 	uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1266 			+ sizeof(lzma_stream_coder)
1267 			+ options->threads * sizeof(worker_thread);
1268 
1269 	if (UINT64_MAX - total_memusage < inbuf_memusage)
1270 		return UINT64_MAX;
1271 
1272 	total_memusage += inbuf_memusage;
1273 
1274 	if (UINT64_MAX - total_memusage < filters_memusage)
1275 		return UINT64_MAX;
1276 
1277 	total_memusage += filters_memusage;
1278 
1279 	if (UINT64_MAX - total_memusage < outq_memusage)
1280 		return UINT64_MAX;
1281 
1282 	return total_memusage + outq_memusage;
1283 }
1284