xref: /freebsd/contrib/xz/src/liblzma/common/stream_encoder_mt.c (revision f7c32ed617858bcd22f8d1b03199099d50125721)
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       stream_encoder_mt.c
4 /// \brief      Multithreaded .xz Stream encoder
5 //
6 //  Author:     Lasse Collin
7 //
8 //  This file has been put into the public domain.
9 //  You can do whatever you want with this file.
10 //
11 ///////////////////////////////////////////////////////////////////////////////
12 
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
18 #include "outqueue.h"
19 
20 
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24 
25 
26 typedef enum {
27 	/// Waiting for work.
28 	THR_IDLE,
29 
30 	/// Encoding is in progress.
31 	THR_RUN,
32 
33 	/// Encoding is in progress but no more input data will
34 	/// be read.
35 	THR_FINISH,
36 
37 	/// The main thread wants the thread to stop whatever it was doing
38 	/// but not exit.
39 	THR_STOP,
40 
41 	/// The main thread wants the thread to exit. We could use
42 	/// cancellation but since there's stopped anyway, this is lazier.
43 	THR_EXIT,
44 
45 } worker_state;
46 
47 typedef struct lzma_stream_coder_s lzma_stream_coder;
48 
49 typedef struct worker_thread_s worker_thread;
50 struct worker_thread_s {
51 	worker_state state;
52 
53 	/// Input buffer of coder->block_size bytes. The main thread will
54 	/// put new input into this and update in_size accordingly. Once
55 	/// no more input is coming, state will be set to THR_FINISH.
56 	uint8_t *in;
57 
58 	/// Amount of data available in the input buffer. This is modified
59 	/// only by the main thread.
60 	size_t in_size;
61 
62 	/// Output buffer for this thread. This is set by the main
63 	/// thread every time a new Block is started with this thread
64 	/// structure.
65 	lzma_outbuf *outbuf;
66 
67 	/// Pointer to the main structure is needed when putting this
68 	/// thread back to the stack of free threads.
69 	lzma_stream_coder *coder;
70 
71 	/// The allocator is set by the main thread. Since a copy of the
72 	/// pointer is kept here, the application must not change the
73 	/// allocator before calling lzma_end().
74 	const lzma_allocator *allocator;
75 
76 	/// Amount of uncompressed data that has already been compressed.
77 	uint64_t progress_in;
78 
79 	/// Amount of compressed data that is ready.
80 	uint64_t progress_out;
81 
82 	/// Block encoder
83 	lzma_next_coder block_encoder;
84 
85 	/// Compression options for this Block
86 	lzma_block block_options;
87 
88 	/// Next structure in the stack of free worker threads.
89 	worker_thread *next;
90 
91 	mythread_mutex mutex;
92 	mythread_cond cond;
93 
94 	/// The ID of this thread is used to join the thread
95 	/// when it's not needed anymore.
96 	mythread thread_id;
97 };
98 
99 
100 struct lzma_stream_coder_s {
101 	enum {
102 		SEQ_STREAM_HEADER,
103 		SEQ_BLOCK,
104 		SEQ_INDEX,
105 		SEQ_STREAM_FOOTER,
106 	} sequence;
107 
108 	/// Start a new Block every block_size bytes of input unless
109 	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
110 	size_t block_size;
111 
112 	/// The filter chain currently in use
113 	lzma_filter filters[LZMA_FILTERS_MAX + 1];
114 
115 
116 	/// Index to hold sizes of the Blocks
117 	lzma_index *index;
118 
119 	/// Index encoder
120 	lzma_next_coder index_encoder;
121 
122 
123 	/// Stream Flags for encoding the Stream Header and Stream Footer.
124 	lzma_stream_flags stream_flags;
125 
126 	/// Buffer to hold Stream Header and Stream Footer.
127 	uint8_t header[LZMA_STREAM_HEADER_SIZE];
128 
129 	/// Read position in header[]
130 	size_t header_pos;
131 
132 
133 	/// Output buffer queue for compressed data
134 	lzma_outq outq;
135 
136 
137 	/// Maximum wait time if cannot use all the input and cannot
138 	/// fill the output buffer. This is in milliseconds.
139 	uint32_t timeout;
140 
141 
142 	/// Error code from a worker thread
143 	lzma_ret thread_error;
144 
145 	/// Array of allocated thread-specific structures
146 	worker_thread *threads;
147 
148 	/// Number of structures in "threads" above. This is also the
149 	/// number of threads that will be created at maximum.
150 	uint32_t threads_max;
151 
152 	/// Number of thread structures that have been initialized, and
153 	/// thus the number of worker threads actually created so far.
154 	uint32_t threads_initialized;
155 
156 	/// Stack of free threads. When a thread finishes, it puts itself
157 	/// back into this stack. This starts as empty because threads
158 	/// are created only when actually needed.
159 	worker_thread *threads_free;
160 
161 	/// The most recent worker thread to which the main thread writes
162 	/// the new input from the application.
163 	worker_thread *thr;
164 
165 
166 	/// Amount of uncompressed data in Blocks that have already
167 	/// been finished.
168 	uint64_t progress_in;
169 
170 	/// Amount of compressed data in Stream Header + Blocks that
171 	/// have already been finished.
172 	uint64_t progress_out;
173 
174 
175 	mythread_mutex mutex;
176 	mythread_cond cond;
177 };
178 
179 
180 /// Tell the main thread that something has gone wrong.
181 static void
182 worker_error(worker_thread *thr, lzma_ret ret)
183 {
184 	assert(ret != LZMA_OK);
185 	assert(ret != LZMA_STREAM_END);
186 
187 	mythread_sync(thr->coder->mutex) {
188 		if (thr->coder->thread_error == LZMA_OK)
189 			thr->coder->thread_error = ret;
190 
191 		mythread_cond_signal(&thr->coder->cond);
192 	}
193 
194 	return;
195 }
196 
197 
198 static worker_state
199 worker_encode(worker_thread *thr, worker_state state)
200 {
201 	assert(thr->progress_in == 0);
202 	assert(thr->progress_out == 0);
203 
204 	// Set the Block options.
205 	thr->block_options = (lzma_block){
206 		.version = 0,
207 		.check = thr->coder->stream_flags.check,
208 		.compressed_size = thr->coder->outq.buf_size_max,
209 		.uncompressed_size = thr->coder->block_size,
210 
211 		// TODO: To allow changing the filter chain, the filters
212 		// array must be copied to each worker_thread.
213 		.filters = thr->coder->filters,
214 	};
215 
216 	// Calculate maximum size of the Block Header. This amount is
217 	// reserved in the beginning of the buffer so that Block Header
218 	// along with Compressed Size and Uncompressed Size can be
219 	// written there.
220 	lzma_ret ret = lzma_block_header_size(&thr->block_options);
221 	if (ret != LZMA_OK) {
222 		worker_error(thr, ret);
223 		return THR_STOP;
224 	}
225 
226 	// Initialize the Block encoder.
227 	ret = lzma_block_encoder_init(&thr->block_encoder,
228 			thr->allocator, &thr->block_options);
229 	if (ret != LZMA_OK) {
230 		worker_error(thr, ret);
231 		return THR_STOP;
232 	}
233 
234 	size_t in_pos = 0;
235 	size_t in_size = 0;
236 
237 	thr->outbuf->size = thr->block_options.header_size;
238 	const size_t out_size = thr->coder->outq.buf_size_max;
239 
240 	do {
241 		mythread_sync(thr->mutex) {
242 			// Store in_pos and out_pos into *thr so that
243 			// an application may read them via
244 			// lzma_get_progress() to get progress information.
245 			//
246 			// NOTE: These aren't updated when the encoding
247 			// finishes. Instead, the final values are taken
248 			// later from thr->outbuf.
249 			thr->progress_in = in_pos;
250 			thr->progress_out = thr->outbuf->size;
251 
252 			while (in_size == thr->in_size
253 					&& thr->state == THR_RUN)
254 				mythread_cond_wait(&thr->cond, &thr->mutex);
255 
256 			state = thr->state;
257 			in_size = thr->in_size;
258 		}
259 
260 		// Return if we were asked to stop or exit.
261 		if (state >= THR_STOP)
262 			return state;
263 
264 		lzma_action action = state == THR_FINISH
265 				? LZMA_FINISH : LZMA_RUN;
266 
267 		// Limit the amount of input given to the Block encoder
268 		// at once. This way this thread can react fairly quickly
269 		// if the main thread wants us to stop or exit.
270 		static const size_t in_chunk_max = 16384;
271 		size_t in_limit = in_size;
272 		if (in_size - in_pos > in_chunk_max) {
273 			in_limit = in_pos + in_chunk_max;
274 			action = LZMA_RUN;
275 		}
276 
277 		ret = thr->block_encoder.code(
278 				thr->block_encoder.coder, thr->allocator,
279 				thr->in, &in_pos, in_limit, thr->outbuf->buf,
280 				&thr->outbuf->size, out_size, action);
281 	} while (ret == LZMA_OK && thr->outbuf->size < out_size);
282 
283 	switch (ret) {
284 	case LZMA_STREAM_END:
285 		assert(state == THR_FINISH);
286 
287 		// Encode the Block Header. By doing it after
288 		// the compression, we can store the Compressed Size
289 		// and Uncompressed Size fields.
290 		ret = lzma_block_header_encode(&thr->block_options,
291 				thr->outbuf->buf);
292 		if (ret != LZMA_OK) {
293 			worker_error(thr, ret);
294 			return THR_STOP;
295 		}
296 
297 		break;
298 
299 	case LZMA_OK:
300 		// The data was incompressible. Encode it using uncompressed
301 		// LZMA2 chunks.
302 		//
303 		// First wait that we have gotten all the input.
304 		mythread_sync(thr->mutex) {
305 			while (thr->state == THR_RUN)
306 				mythread_cond_wait(&thr->cond, &thr->mutex);
307 
308 			state = thr->state;
309 			in_size = thr->in_size;
310 		}
311 
312 		if (state >= THR_STOP)
313 			return state;
314 
315 		// Do the encoding. This takes care of the Block Header too.
316 		thr->outbuf->size = 0;
317 		ret = lzma_block_uncomp_encode(&thr->block_options,
318 				thr->in, in_size, thr->outbuf->buf,
319 				&thr->outbuf->size, out_size);
320 
321 		// It shouldn't fail.
322 		if (ret != LZMA_OK) {
323 			worker_error(thr, LZMA_PROG_ERROR);
324 			return THR_STOP;
325 		}
326 
327 		break;
328 
329 	default:
330 		worker_error(thr, ret);
331 		return THR_STOP;
332 	}
333 
334 	// Set the size information that will be read by the main thread
335 	// to write the Index field.
336 	thr->outbuf->unpadded_size
337 			= lzma_block_unpadded_size(&thr->block_options);
338 	assert(thr->outbuf->unpadded_size != 0);
339 	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
340 
341 	return THR_FINISH;
342 }
343 
344 
345 static MYTHREAD_RET_TYPE
346 worker_start(void *thr_ptr)
347 {
348 	worker_thread *thr = thr_ptr;
349 	worker_state state = THR_IDLE; // Init to silence a warning
350 
351 	while (true) {
352 		// Wait for work.
353 		mythread_sync(thr->mutex) {
354 			while (true) {
355 				// The thread is already idle so if we are
356 				// requested to stop, just set the state.
357 				if (thr->state == THR_STOP) {
358 					thr->state = THR_IDLE;
359 					mythread_cond_signal(&thr->cond);
360 				}
361 
362 				state = thr->state;
363 				if (state != THR_IDLE)
364 					break;
365 
366 				mythread_cond_wait(&thr->cond, &thr->mutex);
367 			}
368 		}
369 
370 		assert(state != THR_IDLE);
371 		assert(state != THR_STOP);
372 
373 		if (state <= THR_FINISH)
374 			state = worker_encode(thr, state);
375 
376 		if (state == THR_EXIT)
377 			break;
378 
379 		// Mark the thread as idle unless the main thread has
380 		// told us to exit. Signal is needed for the case
381 		// where the main thread is waiting for the threads to stop.
382 		mythread_sync(thr->mutex) {
383 			if (thr->state != THR_EXIT) {
384 				thr->state = THR_IDLE;
385 				mythread_cond_signal(&thr->cond);
386 			}
387 		}
388 
389 		mythread_sync(thr->coder->mutex) {
390 			// Mark the output buffer as finished if
391 			// no errors occurred.
392 			thr->outbuf->finished = state == THR_FINISH;
393 
394 			// Update the main progress info.
395 			thr->coder->progress_in
396 					+= thr->outbuf->uncompressed_size;
397 			thr->coder->progress_out += thr->outbuf->size;
398 			thr->progress_in = 0;
399 			thr->progress_out = 0;
400 
401 			// Return this thread to the stack of free threads.
402 			thr->next = thr->coder->threads_free;
403 			thr->coder->threads_free = thr;
404 
405 			mythread_cond_signal(&thr->coder->cond);
406 		}
407 	}
408 
409 	// Exiting, free the resources.
410 	mythread_mutex_destroy(&thr->mutex);
411 	mythread_cond_destroy(&thr->cond);
412 
413 	lzma_next_end(&thr->block_encoder, thr->allocator);
414 	lzma_free(thr->in, thr->allocator);
415 	return MYTHREAD_RET_VALUE;
416 }
417 
418 
419 /// Make the threads stop but not exit. Optionally wait for them to stop.
420 static void
421 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
422 {
423 	// Tell the threads to stop.
424 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425 		mythread_sync(coder->threads[i].mutex) {
426 			coder->threads[i].state = THR_STOP;
427 			mythread_cond_signal(&coder->threads[i].cond);
428 		}
429 	}
430 
431 	if (!wait_for_threads)
432 		return;
433 
434 	// Wait for the threads to settle in the idle state.
435 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
436 		mythread_sync(coder->threads[i].mutex) {
437 			while (coder->threads[i].state != THR_IDLE)
438 				mythread_cond_wait(&coder->threads[i].cond,
439 						&coder->threads[i].mutex);
440 		}
441 	}
442 
443 	return;
444 }
445 
446 
447 /// Stop the threads and free the resources associated with them.
448 /// Wait until the threads have exited.
449 static void
450 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
451 {
452 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453 		mythread_sync(coder->threads[i].mutex) {
454 			coder->threads[i].state = THR_EXIT;
455 			mythread_cond_signal(&coder->threads[i].cond);
456 		}
457 	}
458 
459 	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460 		int ret = mythread_join(coder->threads[i].thread_id);
461 		assert(ret == 0);
462 		(void)ret;
463 	}
464 
465 	lzma_free(coder->threads, allocator);
466 	return;
467 }
468 
469 
470 /// Initialize a new worker_thread structure and create a new thread.
471 static lzma_ret
472 initialize_new_thread(lzma_stream_coder *coder,
473 		const lzma_allocator *allocator)
474 {
475 	worker_thread *thr = &coder->threads[coder->threads_initialized];
476 
477 	thr->in = lzma_alloc(coder->block_size, allocator);
478 	if (thr->in == NULL)
479 		return LZMA_MEM_ERROR;
480 
481 	if (mythread_mutex_init(&thr->mutex))
482 		goto error_mutex;
483 
484 	if (mythread_cond_init(&thr->cond))
485 		goto error_cond;
486 
487 	thr->state = THR_IDLE;
488 	thr->allocator = allocator;
489 	thr->coder = coder;
490 	thr->progress_in = 0;
491 	thr->progress_out = 0;
492 	thr->block_encoder = LZMA_NEXT_CODER_INIT;
493 
494 	if (mythread_create(&thr->thread_id, &worker_start, thr))
495 		goto error_thread;
496 
497 	++coder->threads_initialized;
498 	coder->thr = thr;
499 
500 	return LZMA_OK;
501 
502 error_thread:
503 	mythread_cond_destroy(&thr->cond);
504 
505 error_cond:
506 	mythread_mutex_destroy(&thr->mutex);
507 
508 error_mutex:
509 	lzma_free(thr->in, allocator);
510 	return LZMA_MEM_ERROR;
511 }
512 
513 
514 static lzma_ret
515 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
516 {
517 	// If there are no free output subqueues, there is no
518 	// point to try getting a thread.
519 	if (!lzma_outq_has_buf(&coder->outq))
520 		return LZMA_OK;
521 
522 	// If there is a free structure on the stack, use it.
523 	mythread_sync(coder->mutex) {
524 		if (coder->threads_free != NULL) {
525 			coder->thr = coder->threads_free;
526 			coder->threads_free = coder->threads_free->next;
527 		}
528 	}
529 
530 	if (coder->thr == NULL) {
531 		// If there are no uninitialized structures left, return.
532 		if (coder->threads_initialized == coder->threads_max)
533 			return LZMA_OK;
534 
535 		// Initialize a new thread.
536 		return_if_error(initialize_new_thread(coder, allocator));
537 	}
538 
539 	// Reset the parts of the thread state that have to be done
540 	// in the main thread.
541 	mythread_sync(coder->thr->mutex) {
542 		coder->thr->state = THR_RUN;
543 		coder->thr->in_size = 0;
544 		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
545 		mythread_cond_signal(&coder->thr->cond);
546 	}
547 
548 	return LZMA_OK;
549 }
550 
551 
552 static lzma_ret
553 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
554 		const uint8_t *restrict in, size_t *restrict in_pos,
555 		size_t in_size, lzma_action action)
556 {
557 	while (*in_pos < in_size
558 			|| (coder->thr != NULL && action != LZMA_RUN)) {
559 		if (coder->thr == NULL) {
560 			// Get a new thread.
561 			const lzma_ret ret = get_thread(coder, allocator);
562 			if (coder->thr == NULL)
563 				return ret;
564 		}
565 
566 		// Copy the input data to thread's buffer.
567 		size_t thr_in_size = coder->thr->in_size;
568 		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
569 				&thr_in_size, coder->block_size);
570 
571 		// Tell the Block encoder to finish if
572 		//  - it has got block_size bytes of input; or
573 		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
574 		//    or LZMA_FULL_BARRIER was used.
575 		//
576 		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
577 		const bool finish = thr_in_size == coder->block_size
578 				|| (*in_pos == in_size && action != LZMA_RUN);
579 
580 		bool block_error = false;
581 
582 		mythread_sync(coder->thr->mutex) {
583 			if (coder->thr->state == THR_IDLE) {
584 				// Something has gone wrong with the Block
585 				// encoder. It has set coder->thread_error
586 				// which we will read a few lines later.
587 				block_error = true;
588 			} else {
589 				// Tell the Block encoder its new amount
590 				// of input and update the state if needed.
591 				coder->thr->in_size = thr_in_size;
592 
593 				if (finish)
594 					coder->thr->state = THR_FINISH;
595 
596 				mythread_cond_signal(&coder->thr->cond);
597 			}
598 		}
599 
600 		if (block_error) {
601 			lzma_ret ret;
602 
603 			mythread_sync(coder->mutex) {
604 				ret = coder->thread_error;
605 			}
606 
607 			return ret;
608 		}
609 
610 		if (finish)
611 			coder->thr = NULL;
612 	}
613 
614 	return LZMA_OK;
615 }
616 
617 
618 /// Wait until more input can be consumed, more output can be read, or
619 /// an optional timeout is reached.
620 static bool
621 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
622 		bool *has_blocked, bool has_input)
623 {
624 	if (coder->timeout != 0 && !*has_blocked) {
625 		// Every time when stream_encode_mt() is called via
626 		// lzma_code(), *has_blocked starts as false. We set it
627 		// to true here and calculate the absolute time when
628 		// we must return if there's nothing to do.
629 		//
630 		// The idea of *has_blocked is to avoid unneeded calls
631 		// to mythread_condtime_set(), which may do a syscall
632 		// depending on the operating system.
633 		*has_blocked = true;
634 		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
635 	}
636 
637 	bool timed_out = false;
638 
639 	mythread_sync(coder->mutex) {
640 		// There are four things that we wait. If one of them
641 		// becomes possible, we return.
642 		//  - If there is input left, we need to get a free
643 		//    worker thread and an output buffer for it.
644 		//  - Data ready to be read from the output queue.
645 		//  - A worker thread indicates an error.
646 		//  - Time out occurs.
647 		while ((!has_input || coder->threads_free == NULL
648 					|| !lzma_outq_has_buf(&coder->outq))
649 				&& !lzma_outq_is_readable(&coder->outq)
650 				&& coder->thread_error == LZMA_OK
651 				&& !timed_out) {
652 			if (coder->timeout != 0)
653 				timed_out = mythread_cond_timedwait(
654 						&coder->cond, &coder->mutex,
655 						wait_abs) != 0;
656 			else
657 				mythread_cond_wait(&coder->cond,
658 						&coder->mutex);
659 		}
660 	}
661 
662 	return timed_out;
663 }
664 
665 
666 static lzma_ret
667 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
668 		const uint8_t *restrict in, size_t *restrict in_pos,
669 		size_t in_size, uint8_t *restrict out,
670 		size_t *restrict out_pos, size_t out_size, lzma_action action)
671 {
672 	lzma_stream_coder *coder = coder_ptr;
673 
674 	switch (coder->sequence) {
675 	case SEQ_STREAM_HEADER:
676 		lzma_bufcpy(coder->header, &coder->header_pos,
677 				sizeof(coder->header),
678 				out, out_pos, out_size);
679 		if (coder->header_pos < sizeof(coder->header))
680 			return LZMA_OK;
681 
682 		coder->header_pos = 0;
683 		coder->sequence = SEQ_BLOCK;
684 
685 	// Fall through
686 
687 	case SEQ_BLOCK: {
688 		// Initialized to silence warnings.
689 		lzma_vli unpadded_size = 0;
690 		lzma_vli uncompressed_size = 0;
691 		lzma_ret ret = LZMA_OK;
692 
693 		// These are for wait_for_work().
694 		bool has_blocked = false;
695 		mythread_condtime wait_abs;
696 
697 		while (true) {
698 			mythread_sync(coder->mutex) {
699 				// Check for Block encoder errors.
700 				ret = coder->thread_error;
701 				if (ret != LZMA_OK) {
702 					assert(ret != LZMA_STREAM_END);
703 					break; // Break out of mythread_sync.
704 				}
705 
706 				// Try to read compressed data to out[].
707 				ret = lzma_outq_read(&coder->outq,
708 						out, out_pos, out_size,
709 						&unpadded_size,
710 						&uncompressed_size);
711 			}
712 
713 			if (ret == LZMA_STREAM_END) {
714 				// End of Block. Add it to the Index.
715 				ret = lzma_index_append(coder->index,
716 						allocator, unpadded_size,
717 						uncompressed_size);
718 
719 				// If we didn't fill the output buffer yet,
720 				// try to read more data. Maybe the next
721 				// outbuf has been finished already too.
722 				if (*out_pos < out_size)
723 					continue;
724 			}
725 
726 			if (ret != LZMA_OK) {
727 				// coder->thread_error was set or
728 				// lzma_index_append() failed.
729 				threads_stop(coder, false);
730 				return ret;
731 			}
732 
733 			// Try to give uncompressed data to a worker thread.
734 			ret = stream_encode_in(coder, allocator,
735 					in, in_pos, in_size, action);
736 			if (ret != LZMA_OK) {
737 				threads_stop(coder, false);
738 				return ret;
739 			}
740 
741 			// See if we should wait or return.
742 			//
743 			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
744 			if (*in_pos == in_size) {
745 				// LZMA_RUN: More data is probably coming
746 				// so return to let the caller fill the
747 				// input buffer.
748 				if (action == LZMA_RUN)
749 					return LZMA_OK;
750 
751 				// LZMA_FULL_BARRIER: The same as with
752 				// LZMA_RUN but tell the caller that the
753 				// barrier was completed.
754 				if (action == LZMA_FULL_BARRIER)
755 					return LZMA_STREAM_END;
756 
757 				// Finishing or flushing isn't completed until
758 				// all input data has been encoded and copied
759 				// to the output buffer.
760 				if (lzma_outq_is_empty(&coder->outq)) {
761 					// LZMA_FINISH: Continue to encode
762 					// the Index field.
763 					if (action == LZMA_FINISH)
764 						break;
765 
766 					// LZMA_FULL_FLUSH: Return to tell
767 					// the caller that flushing was
768 					// completed.
769 					if (action == LZMA_FULL_FLUSH)
770 						return LZMA_STREAM_END;
771 				}
772 			}
773 
774 			// Return if there is no output space left.
775 			// This check must be done after testing the input
776 			// buffer, because we might want to use a different
777 			// return code.
778 			if (*out_pos == out_size)
779 				return LZMA_OK;
780 
781 			// Neither in nor out has been used completely.
782 			// Wait until there's something we can do.
783 			if (wait_for_work(coder, &wait_abs, &has_blocked,
784 					*in_pos < in_size))
785 				return LZMA_TIMED_OUT;
786 		}
787 
788 		// All Blocks have been encoded and the threads have stopped.
789 		// Prepare to encode the Index field.
790 		return_if_error(lzma_index_encoder_init(
791 				&coder->index_encoder, allocator,
792 				coder->index));
793 		coder->sequence = SEQ_INDEX;
794 
795 		// Update the progress info to take the Index and
796 		// Stream Footer into account. Those are very fast to encode
797 		// so in terms of progress information they can be thought
798 		// to be ready to be copied out.
799 		coder->progress_out += lzma_index_size(coder->index)
800 				+ LZMA_STREAM_HEADER_SIZE;
801 	}
802 
803 	// Fall through
804 
805 	case SEQ_INDEX: {
806 		// Call the Index encoder. It doesn't take any input, so
807 		// those pointers can be NULL.
808 		const lzma_ret ret = coder->index_encoder.code(
809 				coder->index_encoder.coder, allocator,
810 				NULL, NULL, 0,
811 				out, out_pos, out_size, LZMA_RUN);
812 		if (ret != LZMA_STREAM_END)
813 			return ret;
814 
815 		// Encode the Stream Footer into coder->buffer.
816 		coder->stream_flags.backward_size
817 				= lzma_index_size(coder->index);
818 		if (lzma_stream_footer_encode(&coder->stream_flags,
819 				coder->header) != LZMA_OK)
820 			return LZMA_PROG_ERROR;
821 
822 		coder->sequence = SEQ_STREAM_FOOTER;
823 	}
824 
825 	// Fall through
826 
827 	case SEQ_STREAM_FOOTER:
828 		lzma_bufcpy(coder->header, &coder->header_pos,
829 				sizeof(coder->header),
830 				out, out_pos, out_size);
831 		return coder->header_pos < sizeof(coder->header)
832 				? LZMA_OK : LZMA_STREAM_END;
833 	}
834 
835 	assert(0);
836 	return LZMA_PROG_ERROR;
837 }
838 
839 
840 static void
841 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
842 {
843 	lzma_stream_coder *coder = coder_ptr;
844 
845 	// Threads must be killed before the output queue can be freed.
846 	threads_end(coder, allocator);
847 	lzma_outq_end(&coder->outq, allocator);
848 
849 	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850 		lzma_free(coder->filters[i].options, allocator);
851 
852 	lzma_next_end(&coder->index_encoder, allocator);
853 	lzma_index_end(coder->index, allocator);
854 
855 	mythread_cond_destroy(&coder->cond);
856 	mythread_mutex_destroy(&coder->mutex);
857 
858 	lzma_free(coder, allocator);
859 	return;
860 }
861 
862 
863 /// Options handling for lzma_stream_encoder_mt_init() and
864 /// lzma_stream_encoder_mt_memusage()
865 static lzma_ret
866 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
867 		const lzma_filter **filters, uint64_t *block_size,
868 		uint64_t *outbuf_size_max)
869 {
870 	// Validate some of the options.
871 	if (options == NULL)
872 		return LZMA_PROG_ERROR;
873 
874 	if (options->flags != 0 || options->threads == 0
875 			|| options->threads > LZMA_THREADS_MAX)
876 		return LZMA_OPTIONS_ERROR;
877 
878 	if (options->filters != NULL) {
879 		// Filter chain was given, use it as is.
880 		*filters = options->filters;
881 	} else {
882 		// Use a preset.
883 		if (lzma_easy_preset(opt_easy, options->preset))
884 			return LZMA_OPTIONS_ERROR;
885 
886 		*filters = opt_easy->filters;
887 	}
888 
889 	// Block size
890 	if (options->block_size > 0) {
891 		if (options->block_size > BLOCK_SIZE_MAX)
892 			return LZMA_OPTIONS_ERROR;
893 
894 		*block_size = options->block_size;
895 	} else {
896 		// Determine the Block size from the filter chain.
897 		*block_size = lzma_mt_block_size(*filters);
898 		if (*block_size == 0)
899 			return LZMA_OPTIONS_ERROR;
900 
901 		assert(*block_size <= BLOCK_SIZE_MAX);
902 	}
903 
904 	// Calculate the maximum amount output that a single output buffer
905 	// may need to hold. This is the same as the maximum total size of
906 	// a Block.
907 	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
908 	if (*outbuf_size_max == 0)
909 		return LZMA_MEM_ERROR;
910 
911 	return LZMA_OK;
912 }
913 
914 
915 static void
916 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
917 {
918 	lzma_stream_coder *coder = coder_ptr;
919 
920 	// Lock coder->mutex to prevent finishing threads from moving their
921 	// progress info from the worker_thread structure to lzma_stream_coder.
922 	mythread_sync(coder->mutex) {
923 		*progress_in = coder->progress_in;
924 		*progress_out = coder->progress_out;
925 
926 		for (size_t i = 0; i < coder->threads_initialized; ++i) {
927 			mythread_sync(coder->threads[i].mutex) {
928 				*progress_in += coder->threads[i].progress_in;
929 				*progress_out += coder->threads[i]
930 						.progress_out;
931 			}
932 		}
933 	}
934 
935 	return;
936 }
937 
938 
939 static lzma_ret
940 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
941 		const lzma_mt *options)
942 {
943 	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
944 
945 	// Get the filter chain.
946 	lzma_options_easy easy;
947 	const lzma_filter *filters;
948 	uint64_t block_size;
949 	uint64_t outbuf_size_max;
950 	return_if_error(get_options(options, &easy, &filters,
951 			&block_size, &outbuf_size_max));
952 
953 #if SIZE_MAX < UINT64_MAX
954 	if (block_size > SIZE_MAX)
955 		return LZMA_MEM_ERROR;
956 #endif
957 
958 	// Validate the filter chain so that we can give an error in this
959 	// function instead of delaying it to the first call to lzma_code().
960 	// The memory usage calculation verifies the filter chain as
961 	// a side effect so we take advantage of that.
962 	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
963 		return LZMA_OPTIONS_ERROR;
964 
965 	// Validate the Check ID.
966 	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967 		return LZMA_PROG_ERROR;
968 
969 	if (!lzma_check_is_supported(options->check))
970 		return LZMA_UNSUPPORTED_CHECK;
971 
972 	// Allocate and initialize the base structure if needed.
973 	lzma_stream_coder *coder = next->coder;
974 	if (coder == NULL) {
975 		coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976 		if (coder == NULL)
977 			return LZMA_MEM_ERROR;
978 
979 		next->coder = coder;
980 
981 		// For the mutex and condition variable initializations
982 		// the error handling has to be done here because
983 		// stream_encoder_mt_end() doesn't know if they have
984 		// already been initialized or not.
985 		if (mythread_mutex_init(&coder->mutex)) {
986 			lzma_free(coder, allocator);
987 			next->coder = NULL;
988 			return LZMA_MEM_ERROR;
989 		}
990 
991 		if (mythread_cond_init(&coder->cond)) {
992 			mythread_mutex_destroy(&coder->mutex);
993 			lzma_free(coder, allocator);
994 			next->coder = NULL;
995 			return LZMA_MEM_ERROR;
996 		}
997 
998 		next->code = &stream_encode_mt;
999 		next->end = &stream_encoder_mt_end;
1000 		next->get_progress = &get_progress;
1001 // 		next->update = &stream_encoder_mt_update;
1002 
1003 		coder->filters[0].id = LZMA_VLI_UNKNOWN;
1004 		coder->index_encoder = LZMA_NEXT_CODER_INIT;
1005 		coder->index = NULL;
1006 		memzero(&coder->outq, sizeof(coder->outq));
1007 		coder->threads = NULL;
1008 		coder->threads_max = 0;
1009 		coder->threads_initialized = 0;
1010 	}
1011 
1012 	// Basic initializations
1013 	coder->sequence = SEQ_STREAM_HEADER;
1014 	coder->block_size = (size_t)(block_size);
1015 	coder->thread_error = LZMA_OK;
1016 	coder->thr = NULL;
1017 
1018 	// Allocate the thread-specific base structures.
1019 	assert(options->threads > 0);
1020 	if (coder->threads_max != options->threads) {
1021 		threads_end(coder, allocator);
1022 
1023 		coder->threads = NULL;
1024 		coder->threads_max = 0;
1025 
1026 		coder->threads_initialized = 0;
1027 		coder->threads_free = NULL;
1028 
1029 		coder->threads = lzma_alloc(
1030 				options->threads * sizeof(worker_thread),
1031 				allocator);
1032 		if (coder->threads == NULL)
1033 			return LZMA_MEM_ERROR;
1034 
1035 		coder->threads_max = options->threads;
1036 	} else {
1037 		// Reuse the old structures and threads. Tell the running
1038 		// threads to stop and wait until they have stopped.
1039 		threads_stop(coder, true);
1040 	}
1041 
1042 	// Output queue
1043 	return_if_error(lzma_outq_init(&coder->outq, allocator,
1044 			outbuf_size_max, options->threads));
1045 
1046 	// Timeout
1047 	coder->timeout = options->timeout;
1048 
1049 	// Free the old filter chain and copy the new one.
1050 	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051 		lzma_free(coder->filters[i].options, allocator);
1052 
1053 	return_if_error(lzma_filters_copy(
1054 			filters, coder->filters, allocator));
1055 
1056 	// Index
1057 	lzma_index_end(coder->index, allocator);
1058 	coder->index = lzma_index_init(allocator);
1059 	if (coder->index == NULL)
1060 		return LZMA_MEM_ERROR;
1061 
1062 	// Stream Header
1063 	coder->stream_flags.version = 0;
1064 	coder->stream_flags.check = options->check;
1065 	return_if_error(lzma_stream_header_encode(
1066 			&coder->stream_flags, coder->header));
1067 
1068 	coder->header_pos = 0;
1069 
1070 	// Progress info
1071 	coder->progress_in = 0;
1072 	coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1073 
1074 	return LZMA_OK;
1075 }
1076 
1077 
1078 extern LZMA_API(lzma_ret)
1079 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1080 {
1081 	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1082 
1083 	strm->internal->supported_actions[LZMA_RUN] = true;
1084 // 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1085 	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1086 	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1087 	strm->internal->supported_actions[LZMA_FINISH] = true;
1088 
1089 	return LZMA_OK;
1090 }
1091 
1092 
1093 // This function name is a monster but it's consistent with the older
1094 // monster names. :-( 31 chars is the max that C99 requires so in that
1095 // sense it's not too long. ;-)
1096 extern LZMA_API(uint64_t)
1097 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1098 {
1099 	lzma_options_easy easy;
1100 	const lzma_filter *filters;
1101 	uint64_t block_size;
1102 	uint64_t outbuf_size_max;
1103 
1104 	if (get_options(options, &easy, &filters, &block_size,
1105 			&outbuf_size_max) != LZMA_OK)
1106 		return UINT64_MAX;
1107 
1108 	// Memory usage of the input buffers
1109 	const uint64_t inbuf_memusage = options->threads * block_size;
1110 
1111 	// Memory usage of the filter encoders
1112 	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1113 	if (filters_memusage == UINT64_MAX)
1114 		return UINT64_MAX;
1115 
1116 	filters_memusage *= options->threads;
1117 
1118 	// Memory usage of the output queue
1119 	const uint64_t outq_memusage = lzma_outq_memusage(
1120 			outbuf_size_max, options->threads);
1121 	if (outq_memusage == UINT64_MAX)
1122 		return UINT64_MAX;
1123 
1124 	// Sum them with overflow checking.
1125 	uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126 			+ sizeof(lzma_stream_coder)
1127 			+ options->threads * sizeof(worker_thread);
1128 
1129 	if (UINT64_MAX - total_memusage < inbuf_memusage)
1130 		return UINT64_MAX;
1131 
1132 	total_memusage += inbuf_memusage;
1133 
1134 	if (UINT64_MAX - total_memusage < filters_memusage)
1135 		return UINT64_MAX;
1136 
1137 	total_memusage += filters_memusage;
1138 
1139 	if (UINT64_MAX - total_memusage < outq_memusage)
1140 		return UINT64_MAX;
1141 
1142 	return total_memusage + outq_memusage;
1143 }
1144