1*3b35e7eeSXin LI // SPDX-License-Identifier: 0BSD 2*3b35e7eeSXin LI 353200025SRui Paulo /////////////////////////////////////////////////////////////////////////////// 453200025SRui Paulo // 553200025SRui Paulo /// \file outqueue.c 653200025SRui Paulo /// \brief Output queue handling in multithreaded coding 753200025SRui Paulo // 853200025SRui Paulo // Author: Lasse Collin 953200025SRui Paulo // 1053200025SRui Paulo /////////////////////////////////////////////////////////////////////////////// 1153200025SRui Paulo 1253200025SRui Paulo #include "outqueue.h" 1353200025SRui Paulo 1453200025SRui Paulo 1573ed8e77SXin LI /// Get the maximum number of buffers that may be allocated based 1673ed8e77SXin LI /// on the number of threads. For now this is twice the number of threads. 1773ed8e77SXin LI /// It's a compromise between RAM usage and keeping the worker threads busy 1873ed8e77SXin LI /// when buffers finish out of order. 1973ed8e77SXin LI #define GET_BUFS_LIMIT(threads) (2 * (threads)) 2053200025SRui Paulo 2153200025SRui Paulo 2253200025SRui Paulo extern uint64_t 2353200025SRui Paulo lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) 2453200025SRui Paulo { 2573ed8e77SXin LI // This is to ease integer overflow checking: We may allocate up to 2673ed8e77SXin LI // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra 2773ed8e77SXin LI // memory for other data structures too (that's the /2). 2873ed8e77SXin LI // 2973ed8e77SXin LI // lzma_outq_prealloc_buf() will still accept bigger buffers than this. 3073ed8e77SXin LI const uint64_t limit 3173ed8e77SXin LI = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2; 3253200025SRui Paulo 3373ed8e77SXin LI if (threads > LZMA_THREADS_MAX || buf_size_max > limit) 3453200025SRui Paulo return UINT64_MAX; 3553200025SRui Paulo 3673ed8e77SXin LI return GET_BUFS_LIMIT(threads) 3773ed8e77SXin LI * lzma_outq_outbuf_memusage(buf_size_max); 3873ed8e77SXin LI } 3973ed8e77SXin LI 4073ed8e77SXin LI 4173ed8e77SXin LI static void 4273ed8e77SXin LI move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator) 4373ed8e77SXin LI { 4473ed8e77SXin LI assert(outq->head != NULL); 4573ed8e77SXin LI assert(outq->tail != NULL); 4673ed8e77SXin LI assert(outq->bufs_in_use > 0); 4773ed8e77SXin LI 4873ed8e77SXin LI lzma_outbuf *buf = outq->head; 4973ed8e77SXin LI outq->head = buf->next; 5073ed8e77SXin LI if (outq->head == NULL) 5173ed8e77SXin LI outq->tail = NULL; 5273ed8e77SXin LI 5373ed8e77SXin LI if (outq->cache != NULL && outq->cache->allocated != buf->allocated) 5473ed8e77SXin LI lzma_outq_clear_cache(outq, allocator); 5573ed8e77SXin LI 5673ed8e77SXin LI buf->next = outq->cache; 5773ed8e77SXin LI outq->cache = buf; 5873ed8e77SXin LI 5973ed8e77SXin LI --outq->bufs_in_use; 6073ed8e77SXin LI outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated); 6173ed8e77SXin LI 6273ed8e77SXin LI return; 6373ed8e77SXin LI } 6473ed8e77SXin LI 6573ed8e77SXin LI 6673ed8e77SXin LI static void 6773ed8e77SXin LI free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator) 6873ed8e77SXin LI { 6973ed8e77SXin LI assert(outq->cache != NULL); 7073ed8e77SXin LI 7173ed8e77SXin LI lzma_outbuf *buf = outq->cache; 7273ed8e77SXin LI outq->cache = buf->next; 7373ed8e77SXin LI 7473ed8e77SXin LI --outq->bufs_allocated; 7573ed8e77SXin LI outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated); 7673ed8e77SXin LI 7773ed8e77SXin LI lzma_free(buf, allocator); 7873ed8e77SXin LI return; 7973ed8e77SXin LI } 8073ed8e77SXin LI 8173ed8e77SXin LI 8273ed8e77SXin LI extern void 8373ed8e77SXin LI lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator) 8473ed8e77SXin LI { 8573ed8e77SXin LI while (outq->cache != NULL) 8673ed8e77SXin LI free_one_cached_buffer(outq, allocator); 8773ed8e77SXin LI 8873ed8e77SXin LI return; 8973ed8e77SXin LI } 9073ed8e77SXin LI 9173ed8e77SXin LI 9273ed8e77SXin LI extern void 9373ed8e77SXin LI lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator, 9473ed8e77SXin LI size_t keep_size) 9573ed8e77SXin LI { 9673ed8e77SXin LI if (outq->cache == NULL) 9773ed8e77SXin LI return; 9873ed8e77SXin LI 9973ed8e77SXin LI // Free all but one. 10073ed8e77SXin LI while (outq->cache->next != NULL) 10173ed8e77SXin LI free_one_cached_buffer(outq, allocator); 10273ed8e77SXin LI 10373ed8e77SXin LI // Free the last one only if its size doesn't equal to keep_size. 10473ed8e77SXin LI if (outq->cache->allocated != keep_size) 10573ed8e77SXin LI free_one_cached_buffer(outq, allocator); 10673ed8e77SXin LI 10773ed8e77SXin LI return; 10853200025SRui Paulo } 10953200025SRui Paulo 11053200025SRui Paulo 11153200025SRui Paulo extern lzma_ret 11253200025SRui Paulo lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, 11373ed8e77SXin LI uint32_t threads) 11453200025SRui Paulo { 11573ed8e77SXin LI if (threads > LZMA_THREADS_MAX) 11673ed8e77SXin LI return LZMA_OPTIONS_ERROR; 11753200025SRui Paulo 11873ed8e77SXin LI const uint32_t bufs_limit = GET_BUFS_LIMIT(threads); 11953200025SRui Paulo 12073ed8e77SXin LI // Clear head/tail. 12173ed8e77SXin LI while (outq->head != NULL) 12273ed8e77SXin LI move_head_to_cache(outq, allocator); 12353200025SRui Paulo 12473ed8e77SXin LI // If new buf_limit is lower than the old one, we may need to free 12573ed8e77SXin LI // a few cached buffers. 12673ed8e77SXin LI while (bufs_limit < outq->bufs_allocated) 12773ed8e77SXin LI free_one_cached_buffer(outq, allocator); 12853200025SRui Paulo 12973ed8e77SXin LI outq->bufs_limit = bufs_limit; 13053200025SRui Paulo outq->read_pos = 0; 13153200025SRui Paulo 13253200025SRui Paulo return LZMA_OK; 13353200025SRui Paulo } 13453200025SRui Paulo 13553200025SRui Paulo 13653200025SRui Paulo extern void 13753200025SRui Paulo lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator) 13853200025SRui Paulo { 13973ed8e77SXin LI while (outq->head != NULL) 14073ed8e77SXin LI move_head_to_cache(outq, allocator); 14153200025SRui Paulo 14273ed8e77SXin LI lzma_outq_clear_cache(outq, allocator); 14353200025SRui Paulo return; 14453200025SRui Paulo } 14553200025SRui Paulo 14653200025SRui Paulo 14773ed8e77SXin LI extern lzma_ret 14873ed8e77SXin LI lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator, 14973ed8e77SXin LI size_t size) 15053200025SRui Paulo { 15153200025SRui Paulo // Caller must have checked it with lzma_outq_has_buf(). 15273ed8e77SXin LI assert(outq->bufs_in_use < outq->bufs_limit); 15353200025SRui Paulo 15473ed8e77SXin LI // If there already is appropriately-sized buffer in the cache, 15573ed8e77SXin LI // we need to do nothing. 15673ed8e77SXin LI if (outq->cache != NULL && outq->cache->allocated == size) 15773ed8e77SXin LI return LZMA_OK; 15873ed8e77SXin LI 15973ed8e77SXin LI if (size > SIZE_MAX - sizeof(lzma_outbuf)) 16073ed8e77SXin LI return LZMA_MEM_ERROR; 16173ed8e77SXin LI 16273ed8e77SXin LI const size_t alloc_size = lzma_outq_outbuf_memusage(size); 16373ed8e77SXin LI 16473ed8e77SXin LI // The cache may have buffers but their size is wrong. 16573ed8e77SXin LI lzma_outq_clear_cache(outq, allocator); 16673ed8e77SXin LI 16773ed8e77SXin LI outq->cache = lzma_alloc(alloc_size, allocator); 16873ed8e77SXin LI if (outq->cache == NULL) 16973ed8e77SXin LI return LZMA_MEM_ERROR; 17073ed8e77SXin LI 17173ed8e77SXin LI outq->cache->next = NULL; 17273ed8e77SXin LI outq->cache->allocated = size; 17373ed8e77SXin LI 17473ed8e77SXin LI ++outq->bufs_allocated; 17573ed8e77SXin LI outq->mem_allocated += alloc_size; 17673ed8e77SXin LI 17773ed8e77SXin LI return LZMA_OK; 17873ed8e77SXin LI } 17973ed8e77SXin LI 18073ed8e77SXin LI 18173ed8e77SXin LI extern lzma_outbuf * 18273ed8e77SXin LI lzma_outq_get_buf(lzma_outq *outq, void *worker) 18373ed8e77SXin LI { 18473ed8e77SXin LI // Caller must have used lzma_outq_prealloc_buf() to ensure these. 18573ed8e77SXin LI assert(outq->bufs_in_use < outq->bufs_limit); 18673ed8e77SXin LI assert(outq->bufs_in_use < outq->bufs_allocated); 18773ed8e77SXin LI assert(outq->cache != NULL); 18873ed8e77SXin LI 18973ed8e77SXin LI lzma_outbuf *buf = outq->cache; 19073ed8e77SXin LI outq->cache = buf->next; 19173ed8e77SXin LI buf->next = NULL; 19273ed8e77SXin LI 19373ed8e77SXin LI if (outq->tail != NULL) { 19473ed8e77SXin LI assert(outq->head != NULL); 19573ed8e77SXin LI outq->tail->next = buf; 19673ed8e77SXin LI } else { 19773ed8e77SXin LI assert(outq->head == NULL); 19873ed8e77SXin LI outq->head = buf; 19973ed8e77SXin LI } 20073ed8e77SXin LI 20173ed8e77SXin LI outq->tail = buf; 20273ed8e77SXin LI 20373ed8e77SXin LI buf->worker = worker; 20453200025SRui Paulo buf->finished = false; 20573ed8e77SXin LI buf->finish_ret = LZMA_STREAM_END; 20673ed8e77SXin LI buf->pos = 0; 20773ed8e77SXin LI buf->decoder_in_pos = 0; 20853200025SRui Paulo 20973ed8e77SXin LI buf->unpadded_size = 0; 21073ed8e77SXin LI buf->uncompressed_size = 0; 21153200025SRui Paulo 21273ed8e77SXin LI ++outq->bufs_in_use; 21373ed8e77SXin LI outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated); 21453200025SRui Paulo 21553200025SRui Paulo return buf; 21653200025SRui Paulo } 21753200025SRui Paulo 21853200025SRui Paulo 21953200025SRui Paulo extern bool 22053200025SRui Paulo lzma_outq_is_readable(const lzma_outq *outq) 22153200025SRui Paulo { 22273ed8e77SXin LI if (outq->head == NULL) 22373ed8e77SXin LI return false; 22453200025SRui Paulo 22573ed8e77SXin LI return outq->read_pos < outq->head->pos || outq->head->finished; 22653200025SRui Paulo } 22753200025SRui Paulo 22853200025SRui Paulo 22953200025SRui Paulo extern lzma_ret 23073ed8e77SXin LI lzma_outq_read(lzma_outq *restrict outq, 23173ed8e77SXin LI const lzma_allocator *restrict allocator, 23273ed8e77SXin LI uint8_t *restrict out, size_t *restrict out_pos, 23373ed8e77SXin LI size_t out_size, 23453200025SRui Paulo lzma_vli *restrict unpadded_size, 23553200025SRui Paulo lzma_vli *restrict uncompressed_size) 23653200025SRui Paulo { 23753200025SRui Paulo // There must be at least one buffer from which to read. 23873ed8e77SXin LI if (outq->bufs_in_use == 0) 23953200025SRui Paulo return LZMA_OK; 24053200025SRui Paulo 24153200025SRui Paulo // Get the buffer. 24273ed8e77SXin LI lzma_outbuf *buf = outq->head; 24353200025SRui Paulo 24453200025SRui Paulo // Copy from the buffer to output. 24573ed8e77SXin LI // 24673ed8e77SXin LI // FIXME? In threaded decoder it may be bad to do this copy while 24773ed8e77SXin LI // the mutex is being held. 24873ed8e77SXin LI lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos, 24953200025SRui Paulo out, out_pos, out_size); 25053200025SRui Paulo 25153200025SRui Paulo // Return if we didn't get all the data from the buffer. 25273ed8e77SXin LI if (!buf->finished || outq->read_pos < buf->pos) 25353200025SRui Paulo return LZMA_OK; 25453200025SRui Paulo 25553200025SRui Paulo // The buffer was finished. Tell the caller its size information. 25673ed8e77SXin LI if (unpadded_size != NULL) 25753200025SRui Paulo *unpadded_size = buf->unpadded_size; 25873ed8e77SXin LI 25973ed8e77SXin LI if (uncompressed_size != NULL) 26053200025SRui Paulo *uncompressed_size = buf->uncompressed_size; 26153200025SRui Paulo 26273ed8e77SXin LI // Remember the return value. 26373ed8e77SXin LI const lzma_ret finish_ret = buf->finish_ret; 26473ed8e77SXin LI 26553200025SRui Paulo // Free this buffer for further use. 26673ed8e77SXin LI move_head_to_cache(outq, allocator); 26753200025SRui Paulo outq->read_pos = 0; 26853200025SRui Paulo 26973ed8e77SXin LI return finish_ret; 27073ed8e77SXin LI } 27173ed8e77SXin LI 27273ed8e77SXin LI 27373ed8e77SXin LI extern void 27473ed8e77SXin LI lzma_outq_enable_partial_output(lzma_outq *outq, 27573ed8e77SXin LI void (*enable_partial_output)(void *worker)) 27673ed8e77SXin LI { 27773ed8e77SXin LI if (outq->head != NULL && !outq->head->finished 27873ed8e77SXin LI && outq->head->worker != NULL) { 27973ed8e77SXin LI enable_partial_output(outq->head->worker); 28073ed8e77SXin LI 28173ed8e77SXin LI // Set it to NULL since calling it twice is pointless. 28273ed8e77SXin LI outq->head->worker = NULL; 28373ed8e77SXin LI } 28473ed8e77SXin LI 28573ed8e77SXin LI return; 28653200025SRui Paulo } 287