1 // SPDX-License-Identifier: 0BSD
2
3 ///////////////////////////////////////////////////////////////////////////////
4 //
5 /// \file stream_encoder_mt.c
6 /// \brief Multithreaded .xz Stream encoder
7 //
8 // Author: Lasse Collin
9 //
10 ///////////////////////////////////////////////////////////////////////////////
11
12 #include "filter_encoder.h"
13 #include "easy_preset.h"
14 #include "block_encoder.h"
15 #include "block_buffer_encoder.h"
16 #include "index_encoder.h"
17 #include "outqueue.h"
18
19
20 /// Maximum supported block size. This makes it simpler to prevent integer
21 /// overflows if we are given unusually large block size.
22 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
23
24
25 typedef enum {
26 /// Waiting for work.
27 THR_IDLE,
28
29 /// Encoding is in progress.
30 THR_RUN,
31
32 /// Encoding is in progress but no more input data will
33 /// be read.
34 THR_FINISH,
35
36 /// The main thread wants the thread to stop whatever it was doing
37 /// but not exit.
38 THR_STOP,
39
40 /// The main thread wants the thread to exit. We could use
41 /// cancellation but since there's stopped anyway, this is lazier.
42 THR_EXIT,
43
44 } worker_state;
45
46 typedef struct lzma_stream_coder_s lzma_stream_coder;
47
48 typedef struct worker_thread_s worker_thread;
49 struct worker_thread_s {
50 worker_state state;
51
52 /// Input buffer of coder->block_size bytes. The main thread will
53 /// put new input into this and update in_size accordingly. Once
54 /// no more input is coming, state will be set to THR_FINISH.
55 uint8_t *in;
56
57 /// Amount of data available in the input buffer. This is modified
58 /// only by the main thread.
59 size_t in_size;
60
61 /// Output buffer for this thread. This is set by the main
62 /// thread every time a new Block is started with this thread
63 /// structure.
64 lzma_outbuf *outbuf;
65
66 /// Pointer to the main structure is needed when putting this
67 /// thread back to the stack of free threads.
68 lzma_stream_coder *coder;
69
70 /// The allocator is set by the main thread. Since a copy of the
71 /// pointer is kept here, the application must not change the
72 /// allocator before calling lzma_end().
73 const lzma_allocator *allocator;
74
75 /// Amount of uncompressed data that has already been compressed.
76 uint64_t progress_in;
77
78 /// Amount of compressed data that is ready.
79 uint64_t progress_out;
80
81 /// Block encoder
82 lzma_next_coder block_encoder;
83
84 /// Compression options for this Block
85 lzma_block block_options;
86
87 /// Filter chain for this thread. By copying the filters array
88 /// to each thread it is possible to change the filter chain
89 /// between Blocks using lzma_filters_update().
90 lzma_filter filters[LZMA_FILTERS_MAX + 1];
91
92 /// Next structure in the stack of free worker threads.
93 worker_thread *next;
94
95 mythread_mutex mutex;
96 mythread_cond cond;
97
98 /// The ID of this thread is used to join the thread
99 /// when it's not needed anymore.
100 mythread thread_id;
101 };
102
103
104 struct lzma_stream_coder_s {
105 enum {
106 SEQ_STREAM_HEADER,
107 SEQ_BLOCK,
108 SEQ_INDEX,
109 SEQ_STREAM_FOOTER,
110 } sequence;
111
112 /// Start a new Block every block_size bytes of input unless
113 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
114 size_t block_size;
115
116 /// The filter chain to use for the next Block.
117 /// This can be updated using lzma_filters_update()
118 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
119 lzma_filter filters[LZMA_FILTERS_MAX + 1];
120
121 /// A copy of filters[] will be put here when attempting to get
122 /// a new worker thread. This will be copied to a worker thread
123 /// when a thread becomes free and then this cache is marked as
124 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
125 /// the filter options from filters[] would get uselessly copied
126 /// multiple times (allocated and freed) when waiting for a new free
127 /// worker thread.
128 ///
129 /// This is freed if filters[] is updated via lzma_filters_update().
130 lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
131
132
133 /// Index to hold sizes of the Blocks
134 lzma_index *index;
135
136 /// Index encoder
137 lzma_next_coder index_encoder;
138
139
140 /// Stream Flags for encoding the Stream Header and Stream Footer.
141 lzma_stream_flags stream_flags;
142
143 /// Buffer to hold Stream Header and Stream Footer.
144 uint8_t header[LZMA_STREAM_HEADER_SIZE];
145
146 /// Read position in header[]
147 size_t header_pos;
148
149
150 /// Output buffer queue for compressed data
151 lzma_outq outq;
152
153 /// How much memory to allocate for each lzma_outbuf.buf
154 size_t outbuf_alloc_size;
155
156
157 /// Maximum wait time if cannot use all the input and cannot
158 /// fill the output buffer. This is in milliseconds.
159 uint32_t timeout;
160
161
162 /// Error code from a worker thread
163 lzma_ret thread_error;
164
165 /// Array of allocated thread-specific structures
166 worker_thread *threads;
167
168 /// Number of structures in "threads" above. This is also the
169 /// number of threads that will be created at maximum.
170 uint32_t threads_max;
171
172 /// Number of thread structures that have been initialized, and
173 /// thus the number of worker threads actually created so far.
174 uint32_t threads_initialized;
175
176 /// Stack of free threads. When a thread finishes, it puts itself
177 /// back into this stack. This starts as empty because threads
178 /// are created only when actually needed.
179 worker_thread *threads_free;
180
181 /// The most recent worker thread to which the main thread writes
182 /// the new input from the application.
183 worker_thread *thr;
184
185
186 /// Amount of uncompressed data in Blocks that have already
187 /// been finished.
188 uint64_t progress_in;
189
190 /// Amount of compressed data in Stream Header + Blocks that
191 /// have already been finished.
192 uint64_t progress_out;
193
194
195 mythread_mutex mutex;
196 mythread_cond cond;
197 };
198
199
200 /// Tell the main thread that something has gone wrong.
201 static void
worker_error(worker_thread * thr,lzma_ret ret)202 worker_error(worker_thread *thr, lzma_ret ret)
203 {
204 assert(ret != LZMA_OK);
205 assert(ret != LZMA_STREAM_END);
206
207 mythread_sync(thr->coder->mutex) {
208 if (thr->coder->thread_error == LZMA_OK)
209 thr->coder->thread_error = ret;
210
211 mythread_cond_signal(&thr->coder->cond);
212 }
213
214 return;
215 }
216
217
218 static worker_state
worker_encode(worker_thread * thr,size_t * out_pos,worker_state state)219 worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
220 {
221 assert(thr->progress_in == 0);
222 assert(thr->progress_out == 0);
223
224 // Set the Block options.
225 thr->block_options = (lzma_block){
226 .version = 0,
227 .check = thr->coder->stream_flags.check,
228 .compressed_size = thr->outbuf->allocated,
229 .uncompressed_size = thr->coder->block_size,
230 .filters = thr->filters,
231 };
232
233 // Calculate maximum size of the Block Header. This amount is
234 // reserved in the beginning of the buffer so that Block Header
235 // along with Compressed Size and Uncompressed Size can be
236 // written there.
237 lzma_ret ret = lzma_block_header_size(&thr->block_options);
238 if (ret != LZMA_OK) {
239 worker_error(thr, ret);
240 return THR_STOP;
241 }
242
243 // Initialize the Block encoder.
244 ret = lzma_block_encoder_init(&thr->block_encoder,
245 thr->allocator, &thr->block_options);
246 if (ret != LZMA_OK) {
247 worker_error(thr, ret);
248 return THR_STOP;
249 }
250
251 size_t in_pos = 0;
252 size_t in_size = 0;
253
254 *out_pos = thr->block_options.header_size;
255 const size_t out_size = thr->outbuf->allocated;
256
257 do {
258 mythread_sync(thr->mutex) {
259 // Store in_pos and *out_pos into *thr so that
260 // an application may read them via
261 // lzma_get_progress() to get progress information.
262 //
263 // NOTE: These aren't updated when the encoding
264 // finishes. Instead, the final values are taken
265 // later from thr->outbuf.
266 thr->progress_in = in_pos;
267 thr->progress_out = *out_pos;
268
269 while (in_size == thr->in_size
270 && thr->state == THR_RUN)
271 mythread_cond_wait(&thr->cond, &thr->mutex);
272
273 state = thr->state;
274 in_size = thr->in_size;
275 }
276
277 // Return if we were asked to stop or exit.
278 if (state >= THR_STOP)
279 return state;
280
281 lzma_action action = state == THR_FINISH
282 ? LZMA_FINISH : LZMA_RUN;
283
284 // Limit the amount of input given to the Block encoder
285 // at once. This way this thread can react fairly quickly
286 // if the main thread wants us to stop or exit.
287 static const size_t in_chunk_max = 16384;
288 size_t in_limit = in_size;
289 if (in_size - in_pos > in_chunk_max) {
290 in_limit = in_pos + in_chunk_max;
291 action = LZMA_RUN;
292 }
293
294 ret = thr->block_encoder.code(
295 thr->block_encoder.coder, thr->allocator,
296 thr->in, &in_pos, in_limit, thr->outbuf->buf,
297 out_pos, out_size, action);
298 } while (ret == LZMA_OK && *out_pos < out_size);
299
300 switch (ret) {
301 case LZMA_STREAM_END:
302 assert(state == THR_FINISH);
303
304 // Encode the Block Header. By doing it after
305 // the compression, we can store the Compressed Size
306 // and Uncompressed Size fields.
307 ret = lzma_block_header_encode(&thr->block_options,
308 thr->outbuf->buf);
309 if (ret != LZMA_OK) {
310 worker_error(thr, ret);
311 return THR_STOP;
312 }
313
314 break;
315
316 case LZMA_OK:
317 // The data was incompressible. Encode it using uncompressed
318 // LZMA2 chunks.
319 //
320 // First wait that we have gotten all the input.
321 mythread_sync(thr->mutex) {
322 while (thr->state == THR_RUN)
323 mythread_cond_wait(&thr->cond, &thr->mutex);
324
325 state = thr->state;
326 in_size = thr->in_size;
327 }
328
329 if (state >= THR_STOP)
330 return state;
331
332 // Do the encoding. This takes care of the Block Header too.
333 *out_pos = 0;
334 ret = lzma_block_uncomp_encode(&thr->block_options,
335 thr->in, in_size, thr->outbuf->buf,
336 out_pos, out_size);
337
338 // It shouldn't fail.
339 if (ret != LZMA_OK) {
340 worker_error(thr, LZMA_PROG_ERROR);
341 return THR_STOP;
342 }
343
344 break;
345
346 default:
347 worker_error(thr, ret);
348 return THR_STOP;
349 }
350
351 // Set the size information that will be read by the main thread
352 // to write the Index field.
353 thr->outbuf->unpadded_size
354 = lzma_block_unpadded_size(&thr->block_options);
355 assert(thr->outbuf->unpadded_size != 0);
356 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
357
358 return THR_FINISH;
359 }
360
361
362 static MYTHREAD_RET_TYPE
worker_start(void * thr_ptr)363 worker_start(void *thr_ptr)
364 {
365 worker_thread *thr = thr_ptr;
366 worker_state state = THR_IDLE; // Init to silence a warning
367
368 while (true) {
369 // Wait for work.
370 mythread_sync(thr->mutex) {
371 while (true) {
372 // The thread is already idle so if we are
373 // requested to stop, just set the state.
374 if (thr->state == THR_STOP) {
375 thr->state = THR_IDLE;
376 mythread_cond_signal(&thr->cond);
377 }
378
379 state = thr->state;
380 if (state != THR_IDLE)
381 break;
382
383 mythread_cond_wait(&thr->cond, &thr->mutex);
384 }
385 }
386
387 size_t out_pos = 0;
388
389 assert(state != THR_IDLE);
390 assert(state != THR_STOP);
391
392 if (state <= THR_FINISH)
393 state = worker_encode(thr, &out_pos, state);
394
395 if (state == THR_EXIT)
396 break;
397
398 // Mark the thread as idle unless the main thread has
399 // told us to exit. Signal is needed for the case
400 // where the main thread is waiting for the threads to stop.
401 mythread_sync(thr->mutex) {
402 if (thr->state != THR_EXIT) {
403 thr->state = THR_IDLE;
404 mythread_cond_signal(&thr->cond);
405 }
406 }
407
408 mythread_sync(thr->coder->mutex) {
409 // If no errors occurred, make the encoded data
410 // available to be copied out.
411 if (state == THR_FINISH) {
412 thr->outbuf->pos = out_pos;
413 thr->outbuf->finished = true;
414 }
415
416 // Update the main progress info.
417 thr->coder->progress_in
418 += thr->outbuf->uncompressed_size;
419 thr->coder->progress_out += out_pos;
420 thr->progress_in = 0;
421 thr->progress_out = 0;
422
423 // Return this thread to the stack of free threads.
424 thr->next = thr->coder->threads_free;
425 thr->coder->threads_free = thr;
426
427 mythread_cond_signal(&thr->coder->cond);
428 }
429 }
430
431 // Exiting, free the resources.
432 lzma_filters_free(thr->filters, thr->allocator);
433
434 mythread_mutex_destroy(&thr->mutex);
435 mythread_cond_destroy(&thr->cond);
436
437 lzma_next_end(&thr->block_encoder, thr->allocator);
438 lzma_free(thr->in, thr->allocator);
439 return MYTHREAD_RET_VALUE;
440 }
441
442
443 /// Make the threads stop but not exit. Optionally wait for them to stop.
444 static void
threads_stop(lzma_stream_coder * coder,bool wait_for_threads)445 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
446 {
447 // Tell the threads to stop.
448 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
449 mythread_sync(coder->threads[i].mutex) {
450 coder->threads[i].state = THR_STOP;
451 mythread_cond_signal(&coder->threads[i].cond);
452 }
453 }
454
455 if (!wait_for_threads)
456 return;
457
458 // Wait for the threads to settle in the idle state.
459 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460 mythread_sync(coder->threads[i].mutex) {
461 while (coder->threads[i].state != THR_IDLE)
462 mythread_cond_wait(&coder->threads[i].cond,
463 &coder->threads[i].mutex);
464 }
465 }
466
467 return;
468 }
469
470
471 /// Stop the threads and free the resources associated with them.
472 /// Wait until the threads have exited.
473 static void
threads_end(lzma_stream_coder * coder,const lzma_allocator * allocator)474 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
475 {
476 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
477 mythread_sync(coder->threads[i].mutex) {
478 coder->threads[i].state = THR_EXIT;
479 mythread_cond_signal(&coder->threads[i].cond);
480 }
481 }
482
483 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
484 int ret = mythread_join(coder->threads[i].thread_id);
485 assert(ret == 0);
486 (void)ret;
487 }
488
489 lzma_free(coder->threads, allocator);
490 return;
491 }
492
493
494 /// Initialize a new worker_thread structure and create a new thread.
495 static lzma_ret
initialize_new_thread(lzma_stream_coder * coder,const lzma_allocator * allocator)496 initialize_new_thread(lzma_stream_coder *coder,
497 const lzma_allocator *allocator)
498 {
499 worker_thread *thr = &coder->threads[coder->threads_initialized];
500
501 thr->in = lzma_alloc(coder->block_size, allocator);
502 if (thr->in == NULL)
503 return LZMA_MEM_ERROR;
504
505 if (mythread_mutex_init(&thr->mutex))
506 goto error_mutex;
507
508 if (mythread_cond_init(&thr->cond))
509 goto error_cond;
510
511 thr->state = THR_IDLE;
512 thr->allocator = allocator;
513 thr->coder = coder;
514 thr->progress_in = 0;
515 thr->progress_out = 0;
516 thr->block_encoder = LZMA_NEXT_CODER_INIT;
517 thr->filters[0].id = LZMA_VLI_UNKNOWN;
518
519 if (mythread_create(&thr->thread_id, &worker_start, thr))
520 goto error_thread;
521
522 ++coder->threads_initialized;
523 coder->thr = thr;
524
525 return LZMA_OK;
526
527 error_thread:
528 mythread_cond_destroy(&thr->cond);
529
530 error_cond:
531 mythread_mutex_destroy(&thr->mutex);
532
533 error_mutex:
534 lzma_free(thr->in, allocator);
535 return LZMA_MEM_ERROR;
536 }
537
538
539 static lzma_ret
get_thread(lzma_stream_coder * coder,const lzma_allocator * allocator)540 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
541 {
542 // If there are no free output subqueues, there is no
543 // point to try getting a thread.
544 if (!lzma_outq_has_buf(&coder->outq))
545 return LZMA_OK;
546
547 // That's also true if we cannot allocate memory for the output
548 // buffer in the output queue.
549 return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
550 coder->outbuf_alloc_size));
551
552 // Make a thread-specific copy of the filter chain. Put it in
553 // the cache array first so that if we cannot get a new thread yet,
554 // the allocation is ready when we try again.
555 if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
556 return_if_error(lzma_filters_copy(
557 coder->filters, coder->filters_cache, allocator));
558
559 // If there is a free structure on the stack, use it.
560 mythread_sync(coder->mutex) {
561 if (coder->threads_free != NULL) {
562 coder->thr = coder->threads_free;
563 coder->threads_free = coder->threads_free->next;
564 }
565 }
566
567 if (coder->thr == NULL) {
568 // If there are no uninitialized structures left, return.
569 if (coder->threads_initialized == coder->threads_max)
570 return LZMA_OK;
571
572 // Initialize a new thread.
573 return_if_error(initialize_new_thread(coder, allocator));
574 }
575
576 // Reset the parts of the thread state that have to be done
577 // in the main thread.
578 mythread_sync(coder->thr->mutex) {
579 coder->thr->state = THR_RUN;
580 coder->thr->in_size = 0;
581 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
582
583 // Free the old thread-specific filter options and replace
584 // them with the already-allocated new options from
585 // coder->filters_cache[]. Then mark the cache as empty.
586 lzma_filters_free(coder->thr->filters, allocator);
587 memcpy(coder->thr->filters, coder->filters_cache,
588 sizeof(coder->filters_cache));
589 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
590
591 mythread_cond_signal(&coder->thr->cond);
592 }
593
594 return LZMA_OK;
595 }
596
597
598 static lzma_ret
stream_encode_in(lzma_stream_coder * coder,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,lzma_action action)599 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
600 const uint8_t *restrict in, size_t *restrict in_pos,
601 size_t in_size, lzma_action action)
602 {
603 while (*in_pos < in_size
604 || (coder->thr != NULL && action != LZMA_RUN)) {
605 if (coder->thr == NULL) {
606 // Get a new thread.
607 const lzma_ret ret = get_thread(coder, allocator);
608 if (coder->thr == NULL)
609 return ret;
610 }
611
612 // Copy the input data to thread's buffer.
613 size_t thr_in_size = coder->thr->in_size;
614 lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
615 &thr_in_size, coder->block_size);
616
617 // Tell the Block encoder to finish if
618 // - it has got block_size bytes of input; or
619 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
620 // or LZMA_FULL_BARRIER was used.
621 //
622 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
623 const bool finish = thr_in_size == coder->block_size
624 || (*in_pos == in_size && action != LZMA_RUN);
625
626 bool block_error = false;
627
628 mythread_sync(coder->thr->mutex) {
629 if (coder->thr->state == THR_IDLE) {
630 // Something has gone wrong with the Block
631 // encoder. It has set coder->thread_error
632 // which we will read a few lines later.
633 block_error = true;
634 } else {
635 // Tell the Block encoder its new amount
636 // of input and update the state if needed.
637 coder->thr->in_size = thr_in_size;
638
639 if (finish)
640 coder->thr->state = THR_FINISH;
641
642 mythread_cond_signal(&coder->thr->cond);
643 }
644 }
645
646 if (block_error) {
647 lzma_ret ret = LZMA_OK; // Init to silence a warning.
648
649 mythread_sync(coder->mutex) {
650 ret = coder->thread_error;
651 }
652
653 return ret;
654 }
655
656 if (finish)
657 coder->thr = NULL;
658 }
659
660 return LZMA_OK;
661 }
662
663
664 /// Wait until more input can be consumed, more output can be read, or
665 /// an optional timeout is reached.
666 static bool
wait_for_work(lzma_stream_coder * coder,mythread_condtime * wait_abs,bool * has_blocked,bool has_input)667 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
668 bool *has_blocked, bool has_input)
669 {
670 if (coder->timeout != 0 && !*has_blocked) {
671 // Every time when stream_encode_mt() is called via
672 // lzma_code(), *has_blocked starts as false. We set it
673 // to true here and calculate the absolute time when
674 // we must return if there's nothing to do.
675 //
676 // This way if we block multiple times for short moments
677 // less than "timeout" milliseconds, we will return once
678 // "timeout" amount of time has passed since the *first*
679 // blocking occurred. If the absolute time was calculated
680 // again every time we block, "timeout" would effectively
681 // be meaningless if we never consecutively block longer
682 // than "timeout" ms.
683 *has_blocked = true;
684 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
685 }
686
687 bool timed_out = false;
688
689 mythread_sync(coder->mutex) {
690 // There are four things that we wait. If one of them
691 // becomes possible, we return.
692 // - If there is input left, we need to get a free
693 // worker thread and an output buffer for it.
694 // - Data ready to be read from the output queue.
695 // - A worker thread indicates an error.
696 // - Time out occurs.
697 while ((!has_input || coder->threads_free == NULL
698 || !lzma_outq_has_buf(&coder->outq))
699 && !lzma_outq_is_readable(&coder->outq)
700 && coder->thread_error == LZMA_OK
701 && !timed_out) {
702 if (coder->timeout != 0)
703 timed_out = mythread_cond_timedwait(
704 &coder->cond, &coder->mutex,
705 wait_abs) != 0;
706 else
707 mythread_cond_wait(&coder->cond,
708 &coder->mutex);
709 }
710 }
711
712 return timed_out;
713 }
714
715
716 static lzma_ret
stream_encode_mt(void * coder_ptr,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,uint8_t * restrict out,size_t * restrict out_pos,size_t out_size,lzma_action action)717 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
718 const uint8_t *restrict in, size_t *restrict in_pos,
719 size_t in_size, uint8_t *restrict out,
720 size_t *restrict out_pos, size_t out_size, lzma_action action)
721 {
722 lzma_stream_coder *coder = coder_ptr;
723
724 switch (coder->sequence) {
725 case SEQ_STREAM_HEADER:
726 lzma_bufcpy(coder->header, &coder->header_pos,
727 sizeof(coder->header),
728 out, out_pos, out_size);
729 if (coder->header_pos < sizeof(coder->header))
730 return LZMA_OK;
731
732 coder->header_pos = 0;
733 coder->sequence = SEQ_BLOCK;
734 FALLTHROUGH;
735
736 case SEQ_BLOCK: {
737 // Initialized to silence warnings.
738 lzma_vli unpadded_size = 0;
739 lzma_vli uncompressed_size = 0;
740 lzma_ret ret = LZMA_OK;
741
742 // These are for wait_for_work().
743 bool has_blocked = false;
744 mythread_condtime wait_abs = { 0 };
745
746 while (true) {
747 mythread_sync(coder->mutex) {
748 // Check for Block encoder errors.
749 ret = coder->thread_error;
750 if (ret != LZMA_OK) {
751 assert(ret != LZMA_STREAM_END);
752 break; // Break out of mythread_sync.
753 }
754
755 // Try to read compressed data to out[].
756 ret = lzma_outq_read(&coder->outq, allocator,
757 out, out_pos, out_size,
758 &unpadded_size,
759 &uncompressed_size);
760 }
761
762 if (ret == LZMA_STREAM_END) {
763 // End of Block. Add it to the Index.
764 ret = lzma_index_append(coder->index,
765 allocator, unpadded_size,
766 uncompressed_size);
767 if (ret != LZMA_OK) {
768 threads_stop(coder, false);
769 return ret;
770 }
771
772 // If we didn't fill the output buffer yet,
773 // try to read more data. Maybe the next
774 // outbuf has been finished already too.
775 if (*out_pos < out_size)
776 continue;
777 }
778
779 if (ret != LZMA_OK) {
780 // coder->thread_error was set.
781 threads_stop(coder, false);
782 return ret;
783 }
784
785 // Try to give uncompressed data to a worker thread.
786 ret = stream_encode_in(coder, allocator,
787 in, in_pos, in_size, action);
788 if (ret != LZMA_OK) {
789 threads_stop(coder, false);
790 return ret;
791 }
792
793 // See if we should wait or return.
794 //
795 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
796 if (*in_pos == in_size) {
797 // LZMA_RUN: More data is probably coming
798 // so return to let the caller fill the
799 // input buffer.
800 if (action == LZMA_RUN)
801 return LZMA_OK;
802
803 // LZMA_FULL_BARRIER: The same as with
804 // LZMA_RUN but tell the caller that the
805 // barrier was completed.
806 if (action == LZMA_FULL_BARRIER)
807 return LZMA_STREAM_END;
808
809 // Finishing or flushing isn't completed until
810 // all input data has been encoded and copied
811 // to the output buffer.
812 if (lzma_outq_is_empty(&coder->outq)) {
813 // LZMA_FINISH: Continue to encode
814 // the Index field.
815 if (action == LZMA_FINISH)
816 break;
817
818 // LZMA_FULL_FLUSH: Return to tell
819 // the caller that flushing was
820 // completed.
821 if (action == LZMA_FULL_FLUSH)
822 return LZMA_STREAM_END;
823 }
824 }
825
826 // Return if there is no output space left.
827 // This check must be done after testing the input
828 // buffer, because we might want to use a different
829 // return code.
830 if (*out_pos == out_size)
831 return LZMA_OK;
832
833 // Neither in nor out has been used completely.
834 // Wait until there's something we can do.
835 if (wait_for_work(coder, &wait_abs, &has_blocked,
836 *in_pos < in_size))
837 return LZMA_TIMED_OUT;
838 }
839
840 // All Blocks have been encoded and the threads have stopped.
841 // Prepare to encode the Index field.
842 return_if_error(lzma_index_encoder_init(
843 &coder->index_encoder, allocator,
844 coder->index));
845 coder->sequence = SEQ_INDEX;
846
847 // Update the progress info to take the Index and
848 // Stream Footer into account. Those are very fast to encode
849 // so in terms of progress information they can be thought
850 // to be ready to be copied out.
851 coder->progress_out += lzma_index_size(coder->index)
852 + LZMA_STREAM_HEADER_SIZE;
853
854 FALLTHROUGH;
855 }
856
857 case SEQ_INDEX: {
858 // Call the Index encoder. It doesn't take any input, so
859 // those pointers can be NULL.
860 const lzma_ret ret = coder->index_encoder.code(
861 coder->index_encoder.coder, allocator,
862 NULL, NULL, 0,
863 out, out_pos, out_size, LZMA_RUN);
864 if (ret != LZMA_STREAM_END)
865 return ret;
866
867 // Encode the Stream Footer into coder->buffer.
868 coder->stream_flags.backward_size
869 = lzma_index_size(coder->index);
870 if (lzma_stream_footer_encode(&coder->stream_flags,
871 coder->header) != LZMA_OK)
872 return LZMA_PROG_ERROR;
873
874 coder->sequence = SEQ_STREAM_FOOTER;
875 FALLTHROUGH;
876 }
877
878 case SEQ_STREAM_FOOTER:
879 lzma_bufcpy(coder->header, &coder->header_pos,
880 sizeof(coder->header),
881 out, out_pos, out_size);
882 return coder->header_pos < sizeof(coder->header)
883 ? LZMA_OK : LZMA_STREAM_END;
884 }
885
886 assert(0);
887 return LZMA_PROG_ERROR;
888 }
889
890
891 static void
stream_encoder_mt_end(void * coder_ptr,const lzma_allocator * allocator)892 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
893 {
894 lzma_stream_coder *coder = coder_ptr;
895
896 // Threads must be killed before the output queue can be freed.
897 threads_end(coder, allocator);
898 lzma_outq_end(&coder->outq, allocator);
899
900 lzma_filters_free(coder->filters, allocator);
901 lzma_filters_free(coder->filters_cache, allocator);
902
903 lzma_next_end(&coder->index_encoder, allocator);
904 lzma_index_end(coder->index, allocator);
905
906 mythread_cond_destroy(&coder->cond);
907 mythread_mutex_destroy(&coder->mutex);
908
909 lzma_free(coder, allocator);
910 return;
911 }
912
913
914 static lzma_ret
stream_encoder_mt_update(void * coder_ptr,const lzma_allocator * allocator,const lzma_filter * filters,const lzma_filter * reversed_filters lzma_attribute ((__unused__)))915 stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
916 const lzma_filter *filters,
917 const lzma_filter *reversed_filters
918 lzma_attribute((__unused__)))
919 {
920 lzma_stream_coder *coder = coder_ptr;
921
922 // Applications shouldn't attempt to change the options when
923 // we are already encoding the Index or Stream Footer.
924 if (coder->sequence > SEQ_BLOCK)
925 return LZMA_PROG_ERROR;
926
927 // For now the threaded encoder doesn't support changing
928 // the options in the middle of a Block.
929 if (coder->thr != NULL)
930 return LZMA_PROG_ERROR;
931
932 // Check if the filter chain seems mostly valid. See the comment
933 // in stream_encoder_mt_init().
934 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
935 return LZMA_OPTIONS_ERROR;
936
937 // Make a copy to a temporary buffer first. This way the encoder
938 // state stays unchanged if an error occurs in lzma_filters_copy().
939 lzma_filter temp[LZMA_FILTERS_MAX + 1];
940 return_if_error(lzma_filters_copy(filters, temp, allocator));
941
942 // Free the options of the old chain as well as the cache.
943 lzma_filters_free(coder->filters, allocator);
944 lzma_filters_free(coder->filters_cache, allocator);
945
946 // Copy the new filter chain in place.
947 memcpy(coder->filters, temp, sizeof(temp));
948
949 return LZMA_OK;
950 }
951
952
953 /// Options handling for lzma_stream_encoder_mt_init() and
954 /// lzma_stream_encoder_mt_memusage()
955 static lzma_ret
get_options(const lzma_mt * options,lzma_options_easy * opt_easy,const lzma_filter ** filters,uint64_t * block_size,uint64_t * outbuf_size_max)956 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
957 const lzma_filter **filters, uint64_t *block_size,
958 uint64_t *outbuf_size_max)
959 {
960 // Validate some of the options.
961 if (options == NULL)
962 return LZMA_PROG_ERROR;
963
964 if (options->flags != 0 || options->threads == 0
965 || options->threads > LZMA_THREADS_MAX)
966 return LZMA_OPTIONS_ERROR;
967
968 if (options->filters != NULL) {
969 // Filter chain was given, use it as is.
970 *filters = options->filters;
971 } else {
972 // Use a preset.
973 if (lzma_easy_preset(opt_easy, options->preset))
974 return LZMA_OPTIONS_ERROR;
975
976 *filters = opt_easy->filters;
977 }
978
979 // If the Block size is not set, determine it from the filter chain.
980 if (options->block_size > 0)
981 *block_size = options->block_size;
982 else
983 *block_size = lzma_mt_block_size(*filters);
984
985 // UINT64_MAX > BLOCK_SIZE_MAX, so the second condition
986 // should be optimized out by any reasonable compiler.
987 // The second condition should be there in the unlikely event that
988 // the macros change and UINT64_MAX < BLOCK_SIZE_MAX.
989 if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX)
990 return LZMA_OPTIONS_ERROR;
991
992 // Calculate the maximum amount output that a single output buffer
993 // may need to hold. This is the same as the maximum total size of
994 // a Block.
995 *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
996 if (*outbuf_size_max == 0)
997 return LZMA_MEM_ERROR;
998
999 return LZMA_OK;
1000 }
1001
1002
1003 static void
get_progress(void * coder_ptr,uint64_t * progress_in,uint64_t * progress_out)1004 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
1005 {
1006 lzma_stream_coder *coder = coder_ptr;
1007
1008 // Lock coder->mutex to prevent finishing threads from moving their
1009 // progress info from the worker_thread structure to lzma_stream_coder.
1010 mythread_sync(coder->mutex) {
1011 *progress_in = coder->progress_in;
1012 *progress_out = coder->progress_out;
1013
1014 for (size_t i = 0; i < coder->threads_initialized; ++i) {
1015 mythread_sync(coder->threads[i].mutex) {
1016 *progress_in += coder->threads[i].progress_in;
1017 *progress_out += coder->threads[i]
1018 .progress_out;
1019 }
1020 }
1021 }
1022
1023 return;
1024 }
1025
1026
1027 static lzma_ret
stream_encoder_mt_init(lzma_next_coder * next,const lzma_allocator * allocator,const lzma_mt * options)1028 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
1029 const lzma_mt *options)
1030 {
1031 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
1032
1033 // Get the filter chain.
1034 lzma_options_easy easy;
1035 const lzma_filter *filters;
1036 uint64_t block_size;
1037 uint64_t outbuf_size_max;
1038 return_if_error(get_options(options, &easy, &filters,
1039 &block_size, &outbuf_size_max));
1040
1041 #if SIZE_MAX < UINT64_MAX
1042 if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
1043 return LZMA_MEM_ERROR;
1044 #endif
1045
1046 // Validate the filter chain so that we can give an error in this
1047 // function instead of delaying it to the first call to lzma_code().
1048 // The memory usage calculation verifies the filter chain as
1049 // a side effect so we take advantage of that. It's not a perfect
1050 // check though as raw encoder allows LZMA1 too but such problems
1051 // will be caught eventually with Block Header encoder.
1052 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
1053 return LZMA_OPTIONS_ERROR;
1054
1055 // Validate the Check ID.
1056 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
1057 return LZMA_PROG_ERROR;
1058
1059 if (!lzma_check_is_supported(options->check))
1060 return LZMA_UNSUPPORTED_CHECK;
1061
1062 // Allocate and initialize the base structure if needed.
1063 lzma_stream_coder *coder = next->coder;
1064 if (coder == NULL) {
1065 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
1066 if (coder == NULL)
1067 return LZMA_MEM_ERROR;
1068
1069 next->coder = coder;
1070
1071 // For the mutex and condition variable initializations
1072 // the error handling has to be done here because
1073 // stream_encoder_mt_end() doesn't know if they have
1074 // already been initialized or not.
1075 if (mythread_mutex_init(&coder->mutex)) {
1076 lzma_free(coder, allocator);
1077 next->coder = NULL;
1078 return LZMA_MEM_ERROR;
1079 }
1080
1081 if (mythread_cond_init(&coder->cond)) {
1082 mythread_mutex_destroy(&coder->mutex);
1083 lzma_free(coder, allocator);
1084 next->coder = NULL;
1085 return LZMA_MEM_ERROR;
1086 }
1087
1088 next->code = &stream_encode_mt;
1089 next->end = &stream_encoder_mt_end;
1090 next->get_progress = &get_progress;
1091 next->update = &stream_encoder_mt_update;
1092
1093 coder->filters[0].id = LZMA_VLI_UNKNOWN;
1094 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
1095 coder->index_encoder = LZMA_NEXT_CODER_INIT;
1096 coder->index = NULL;
1097 memzero(&coder->outq, sizeof(coder->outq));
1098 coder->threads = NULL;
1099 coder->threads_max = 0;
1100 coder->threads_initialized = 0;
1101 }
1102
1103 // Basic initializations
1104 coder->sequence = SEQ_STREAM_HEADER;
1105 coder->block_size = (size_t)(block_size);
1106 coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
1107 coder->thread_error = LZMA_OK;
1108 coder->thr = NULL;
1109
1110 // Allocate the thread-specific base structures.
1111 assert(options->threads > 0);
1112 if (coder->threads_max != options->threads) {
1113 threads_end(coder, allocator);
1114
1115 coder->threads = NULL;
1116 coder->threads_max = 0;
1117
1118 coder->threads_initialized = 0;
1119 coder->threads_free = NULL;
1120
1121 coder->threads = lzma_alloc(
1122 options->threads * sizeof(worker_thread),
1123 allocator);
1124 if (coder->threads == NULL)
1125 return LZMA_MEM_ERROR;
1126
1127 coder->threads_max = options->threads;
1128 } else {
1129 // Reuse the old structures and threads. Tell the running
1130 // threads to stop and wait until they have stopped.
1131 threads_stop(coder, true);
1132 }
1133
1134 // Output queue
1135 return_if_error(lzma_outq_init(&coder->outq, allocator,
1136 options->threads));
1137
1138 // Timeout
1139 coder->timeout = options->timeout;
1140
1141 // Free the old filter chain and the cache.
1142 lzma_filters_free(coder->filters, allocator);
1143 lzma_filters_free(coder->filters_cache, allocator);
1144
1145 // Copy the new filter chain.
1146 return_if_error(lzma_filters_copy(
1147 filters, coder->filters, allocator));
1148
1149 // Index
1150 lzma_index_end(coder->index, allocator);
1151 coder->index = lzma_index_init(allocator);
1152 if (coder->index == NULL)
1153 return LZMA_MEM_ERROR;
1154
1155 // Stream Header
1156 coder->stream_flags.version = 0;
1157 coder->stream_flags.check = options->check;
1158 return_if_error(lzma_stream_header_encode(
1159 &coder->stream_flags, coder->header));
1160
1161 coder->header_pos = 0;
1162
1163 // Progress info
1164 coder->progress_in = 0;
1165 coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1166
1167 return LZMA_OK;
1168 }
1169
1170
1171 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1172 // These are for compatibility with binaries linked against liblzma that
1173 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1174 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1175 // but it has been added here anyway since someone might misread the
1176 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1177 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1178 lzma_ret, lzma_stream_encoder_mt_512a)(
1179 lzma_stream *strm, const lzma_mt *options)
1180 lzma_nothrow lzma_attr_warn_unused_result
1181 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1182
1183 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1184 lzma_ret, lzma_stream_encoder_mt_522)(
1185 lzma_stream *strm, const lzma_mt *options)
1186 lzma_nothrow lzma_attr_warn_unused_result
1187 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1188
1189 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1190 lzma_ret, lzma_stream_encoder_mt_52)(
1191 lzma_stream *strm, const lzma_mt *options)
1192 lzma_nothrow lzma_attr_warn_unused_result;
1193
1194 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1195 #endif
1196 extern LZMA_API(lzma_ret)
lzma_stream_encoder_mt(lzma_stream * strm,const lzma_mt * options)1197 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1198 {
1199 lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1200
1201 strm->internal->supported_actions[LZMA_RUN] = true;
1202 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1203 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1204 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1205 strm->internal->supported_actions[LZMA_FINISH] = true;
1206
1207 return LZMA_OK;
1208 }
1209
1210
1211 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1212 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1213 uint64_t, lzma_stream_encoder_mt_memusage_512a)(
1214 const lzma_mt *options) lzma_nothrow lzma_attr_pure
1215 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1216
1217 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1218 uint64_t, lzma_stream_encoder_mt_memusage_522)(
1219 const lzma_mt *options) lzma_nothrow lzma_attr_pure
1220 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1221
1222 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1223 uint64_t, lzma_stream_encoder_mt_memusage_52)(
1224 const lzma_mt *options) lzma_nothrow lzma_attr_pure;
1225
1226 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1227 #endif
1228 // This function name is a monster but it's consistent with the older
1229 // monster names. :-( 31 chars is the max that C99 requires so in that
1230 // sense it's not too long. ;-)
1231 extern LZMA_API(uint64_t)
lzma_stream_encoder_mt_memusage(const lzma_mt * options)1232 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1233 {
1234 lzma_options_easy easy;
1235 const lzma_filter *filters;
1236 uint64_t block_size;
1237 uint64_t outbuf_size_max;
1238
1239 if (get_options(options, &easy, &filters, &block_size,
1240 &outbuf_size_max) != LZMA_OK)
1241 return UINT64_MAX;
1242
1243 // Memory usage of the input buffers
1244 const uint64_t inbuf_memusage = options->threads * block_size;
1245
1246 // Memory usage of the filter encoders
1247 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1248 if (filters_memusage == UINT64_MAX)
1249 return UINT64_MAX;
1250
1251 filters_memusage *= options->threads;
1252
1253 // Memory usage of the output queue
1254 const uint64_t outq_memusage = lzma_outq_memusage(
1255 outbuf_size_max, options->threads);
1256 if (outq_memusage == UINT64_MAX)
1257 return UINT64_MAX;
1258
1259 // Sum them with overflow checking.
1260 uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1261 + sizeof(lzma_stream_coder)
1262 + options->threads * sizeof(worker_thread);
1263
1264 if (UINT64_MAX - total_memusage < inbuf_memusage)
1265 return UINT64_MAX;
1266
1267 total_memusage += inbuf_memusage;
1268
1269 if (UINT64_MAX - total_memusage < filters_memusage)
1270 return UINT64_MAX;
1271
1272 total_memusage += filters_memusage;
1273
1274 if (UINT64_MAX - total_memusage < outq_memusage)
1275 return UINT64_MAX;
1276
1277 return total_memusage + outq_memusage;
1278 }
1279