1 // SPDX-License-Identifier: 0BSD 2 3 /////////////////////////////////////////////////////////////////////////////// 4 // 5 /// \file outqueue.c 6 /// \brief Output queue handling in multithreaded coding 7 // 8 // Author: Lasse Collin 9 // 10 /////////////////////////////////////////////////////////////////////////////// 11 12 #include "outqueue.h" 13 14 15 /// Get the maximum number of buffers that may be allocated based 16 /// on the number of threads. For now this is twice the number of threads. 17 /// It's a compromise between RAM usage and keeping the worker threads busy 18 /// when buffers finish out of order. 19 #define GET_BUFS_LIMIT(threads) (2 * (threads)) 20 21 22 extern uint64_t 23 lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) 24 { 25 // This is to ease integer overflow checking: We may allocate up to 26 // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra 27 // memory for other data structures too (that's the /2). 28 // 29 // lzma_outq_prealloc_buf() will still accept bigger buffers than this. 30 const uint64_t limit 31 = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2; 32 33 if (threads > LZMA_THREADS_MAX || buf_size_max > limit) 34 return UINT64_MAX; 35 36 return GET_BUFS_LIMIT(threads) 37 * lzma_outq_outbuf_memusage(buf_size_max); 38 } 39 40 41 static void 42 move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator) 43 { 44 assert(outq->head != NULL); 45 assert(outq->tail != NULL); 46 assert(outq->bufs_in_use > 0); 47 48 lzma_outbuf *buf = outq->head; 49 outq->head = buf->next; 50 if (outq->head == NULL) 51 outq->tail = NULL; 52 53 if (outq->cache != NULL && outq->cache->allocated != buf->allocated) 54 lzma_outq_clear_cache(outq, allocator); 55 56 buf->next = outq->cache; 57 outq->cache = buf; 58 59 --outq->bufs_in_use; 60 outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated); 61 62 return; 63 } 64 65 66 static void 67 free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator) 68 { 69 assert(outq->cache != NULL); 70 71 lzma_outbuf *buf = outq->cache; 72 outq->cache = buf->next; 73 74 --outq->bufs_allocated; 75 outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated); 76 77 lzma_free(buf, allocator); 78 return; 79 } 80 81 82 extern void 83 lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator) 84 { 85 while (outq->cache != NULL) 86 free_one_cached_buffer(outq, allocator); 87 88 return; 89 } 90 91 92 extern void 93 lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator, 94 size_t keep_size) 95 { 96 if (outq->cache == NULL) 97 return; 98 99 // Free all but one. 100 while (outq->cache->next != NULL) 101 free_one_cached_buffer(outq, allocator); 102 103 // Free the last one only if its size doesn't equal to keep_size. 104 if (outq->cache->allocated != keep_size) 105 free_one_cached_buffer(outq, allocator); 106 107 return; 108 } 109 110 111 extern lzma_ret 112 lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, 113 uint32_t threads) 114 { 115 if (threads > LZMA_THREADS_MAX) 116 return LZMA_OPTIONS_ERROR; 117 118 const uint32_t bufs_limit = GET_BUFS_LIMIT(threads); 119 120 // Clear head/tail. 121 while (outq->head != NULL) 122 move_head_to_cache(outq, allocator); 123 124 // If new buf_limit is lower than the old one, we may need to free 125 // a few cached buffers. 126 while (bufs_limit < outq->bufs_allocated) 127 free_one_cached_buffer(outq, allocator); 128 129 outq->bufs_limit = bufs_limit; 130 outq->read_pos = 0; 131 132 return LZMA_OK; 133 } 134 135 136 extern void 137 lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator) 138 { 139 while (outq->head != NULL) 140 move_head_to_cache(outq, allocator); 141 142 lzma_outq_clear_cache(outq, allocator); 143 return; 144 } 145 146 147 extern lzma_ret 148 lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator, 149 size_t size) 150 { 151 // Caller must have checked it with lzma_outq_has_buf(). 152 assert(outq->bufs_in_use < outq->bufs_limit); 153 154 // If there already is appropriately-sized buffer in the cache, 155 // we need to do nothing. 156 if (outq->cache != NULL && outq->cache->allocated == size) 157 return LZMA_OK; 158 159 if (size > SIZE_MAX - sizeof(lzma_outbuf)) 160 return LZMA_MEM_ERROR; 161 162 const size_t alloc_size = lzma_outq_outbuf_memusage(size); 163 164 // The cache may have buffers but their size is wrong. 165 lzma_outq_clear_cache(outq, allocator); 166 167 outq->cache = lzma_alloc(alloc_size, allocator); 168 if (outq->cache == NULL) 169 return LZMA_MEM_ERROR; 170 171 outq->cache->next = NULL; 172 outq->cache->allocated = size; 173 174 ++outq->bufs_allocated; 175 outq->mem_allocated += alloc_size; 176 177 return LZMA_OK; 178 } 179 180 181 extern lzma_outbuf * 182 lzma_outq_get_buf(lzma_outq *outq, void *worker) 183 { 184 // Caller must have used lzma_outq_prealloc_buf() to ensure these. 185 assert(outq->bufs_in_use < outq->bufs_limit); 186 assert(outq->bufs_in_use < outq->bufs_allocated); 187 assert(outq->cache != NULL); 188 189 lzma_outbuf *buf = outq->cache; 190 outq->cache = buf->next; 191 buf->next = NULL; 192 193 if (outq->tail != NULL) { 194 assert(outq->head != NULL); 195 outq->tail->next = buf; 196 } else { 197 assert(outq->head == NULL); 198 outq->head = buf; 199 } 200 201 outq->tail = buf; 202 203 buf->worker = worker; 204 buf->finished = false; 205 buf->finish_ret = LZMA_STREAM_END; 206 buf->pos = 0; 207 buf->decoder_in_pos = 0; 208 209 buf->unpadded_size = 0; 210 buf->uncompressed_size = 0; 211 212 ++outq->bufs_in_use; 213 outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated); 214 215 return buf; 216 } 217 218 219 extern bool 220 lzma_outq_is_readable(const lzma_outq *outq) 221 { 222 if (outq->head == NULL) 223 return false; 224 225 return outq->read_pos < outq->head->pos || outq->head->finished; 226 } 227 228 229 extern lzma_ret 230 lzma_outq_read(lzma_outq *restrict outq, 231 const lzma_allocator *restrict allocator, 232 uint8_t *restrict out, size_t *restrict out_pos, 233 size_t out_size, 234 lzma_vli *restrict unpadded_size, 235 lzma_vli *restrict uncompressed_size) 236 { 237 // There must be at least one buffer from which to read. 238 if (outq->bufs_in_use == 0) 239 return LZMA_OK; 240 241 // Get the buffer. 242 lzma_outbuf *buf = outq->head; 243 244 // Copy from the buffer to output. 245 // 246 // FIXME? In threaded decoder it may be bad to do this copy while 247 // the mutex is being held. 248 lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos, 249 out, out_pos, out_size); 250 251 // Return if we didn't get all the data from the buffer. 252 if (!buf->finished || outq->read_pos < buf->pos) 253 return LZMA_OK; 254 255 // The buffer was finished. Tell the caller its size information. 256 if (unpadded_size != NULL) 257 *unpadded_size = buf->unpadded_size; 258 259 if (uncompressed_size != NULL) 260 *uncompressed_size = buf->uncompressed_size; 261 262 // Remember the return value. 263 const lzma_ret finish_ret = buf->finish_ret; 264 265 // Free this buffer for further use. 266 move_head_to_cache(outq, allocator); 267 outq->read_pos = 0; 268 269 return finish_ret; 270 } 271 272 273 extern void 274 lzma_outq_enable_partial_output(lzma_outq *outq, 275 void (*enable_partial_output)(void *worker)) 276 { 277 if (outq->head != NULL && !outq->head->finished 278 && outq->head->worker != NULL) { 279 enable_partial_output(outq->head->worker); 280 281 // Set it to NULL since calling it twice is pointless. 282 outq->head->worker = NULL; 283 } 284 285 return; 286 } 287