1 /////////////////////////////////////////////////////////////////////////////// 2 // 3 /// \file outqueue.c 4 /// \brief Output queue handling in multithreaded coding 5 // 6 // Author: Lasse Collin 7 // 8 // This file has been put into the public domain. 9 // You can do whatever you want with this file. 10 // 11 /////////////////////////////////////////////////////////////////////////////// 12 13 #include "outqueue.h" 14 15 16 /// Get the maximum number of buffers that may be allocated based 17 /// on the number of threads. For now this is twice the number of threads. 18 /// It's a compromise between RAM usage and keeping the worker threads busy 19 /// when buffers finish out of order. 20 #define GET_BUFS_LIMIT(threads) (2 * (threads)) 21 22 23 extern uint64_t 24 lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) 25 { 26 // This is to ease integer overflow checking: We may allocate up to 27 // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra 28 // memory for other data structures too (that's the /2). 29 // 30 // lzma_outq_prealloc_buf() will still accept bigger buffers than this. 31 const uint64_t limit 32 = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2; 33 34 if (threads > LZMA_THREADS_MAX || buf_size_max > limit) 35 return UINT64_MAX; 36 37 return GET_BUFS_LIMIT(threads) 38 * lzma_outq_outbuf_memusage(buf_size_max); 39 } 40 41 42 static void 43 move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator) 44 { 45 assert(outq->head != NULL); 46 assert(outq->tail != NULL); 47 assert(outq->bufs_in_use > 0); 48 49 lzma_outbuf *buf = outq->head; 50 outq->head = buf->next; 51 if (outq->head == NULL) 52 outq->tail = NULL; 53 54 if (outq->cache != NULL && outq->cache->allocated != buf->allocated) 55 lzma_outq_clear_cache(outq, allocator); 56 57 buf->next = outq->cache; 58 outq->cache = buf; 59 60 --outq->bufs_in_use; 61 outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated); 62 63 return; 64 } 65 66 67 static void 68 free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator) 69 { 70 assert(outq->cache != NULL); 71 72 lzma_outbuf *buf = outq->cache; 73 outq->cache = buf->next; 74 75 --outq->bufs_allocated; 76 outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated); 77 78 lzma_free(buf, allocator); 79 return; 80 } 81 82 83 extern void 84 lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator) 85 { 86 while (outq->cache != NULL) 87 free_one_cached_buffer(outq, allocator); 88 89 return; 90 } 91 92 93 extern void 94 lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator, 95 size_t keep_size) 96 { 97 if (outq->cache == NULL) 98 return; 99 100 // Free all but one. 101 while (outq->cache->next != NULL) 102 free_one_cached_buffer(outq, allocator); 103 104 // Free the last one only if its size doesn't equal to keep_size. 105 if (outq->cache->allocated != keep_size) 106 free_one_cached_buffer(outq, allocator); 107 108 return; 109 } 110 111 112 extern lzma_ret 113 lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, 114 uint32_t threads) 115 { 116 if (threads > LZMA_THREADS_MAX) 117 return LZMA_OPTIONS_ERROR; 118 119 const uint32_t bufs_limit = GET_BUFS_LIMIT(threads); 120 121 // Clear head/tail. 122 while (outq->head != NULL) 123 move_head_to_cache(outq, allocator); 124 125 // If new buf_limit is lower than the old one, we may need to free 126 // a few cached buffers. 127 while (bufs_limit < outq->bufs_allocated) 128 free_one_cached_buffer(outq, allocator); 129 130 outq->bufs_limit = bufs_limit; 131 outq->read_pos = 0; 132 133 return LZMA_OK; 134 } 135 136 137 extern void 138 lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator) 139 { 140 while (outq->head != NULL) 141 move_head_to_cache(outq, allocator); 142 143 lzma_outq_clear_cache(outq, allocator); 144 return; 145 } 146 147 148 extern lzma_ret 149 lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator, 150 size_t size) 151 { 152 // Caller must have checked it with lzma_outq_has_buf(). 153 assert(outq->bufs_in_use < outq->bufs_limit); 154 155 // If there already is appropriately-sized buffer in the cache, 156 // we need to do nothing. 157 if (outq->cache != NULL && outq->cache->allocated == size) 158 return LZMA_OK; 159 160 if (size > SIZE_MAX - sizeof(lzma_outbuf)) 161 return LZMA_MEM_ERROR; 162 163 const size_t alloc_size = lzma_outq_outbuf_memusage(size); 164 165 // The cache may have buffers but their size is wrong. 166 lzma_outq_clear_cache(outq, allocator); 167 168 outq->cache = lzma_alloc(alloc_size, allocator); 169 if (outq->cache == NULL) 170 return LZMA_MEM_ERROR; 171 172 outq->cache->next = NULL; 173 outq->cache->allocated = size; 174 175 ++outq->bufs_allocated; 176 outq->mem_allocated += alloc_size; 177 178 return LZMA_OK; 179 } 180 181 182 extern lzma_outbuf * 183 lzma_outq_get_buf(lzma_outq *outq, void *worker) 184 { 185 // Caller must have used lzma_outq_prealloc_buf() to ensure these. 186 assert(outq->bufs_in_use < outq->bufs_limit); 187 assert(outq->bufs_in_use < outq->bufs_allocated); 188 assert(outq->cache != NULL); 189 190 lzma_outbuf *buf = outq->cache; 191 outq->cache = buf->next; 192 buf->next = NULL; 193 194 if (outq->tail != NULL) { 195 assert(outq->head != NULL); 196 outq->tail->next = buf; 197 } else { 198 assert(outq->head == NULL); 199 outq->head = buf; 200 } 201 202 outq->tail = buf; 203 204 buf->worker = worker; 205 buf->finished = false; 206 buf->finish_ret = LZMA_STREAM_END; 207 buf->pos = 0; 208 buf->decoder_in_pos = 0; 209 210 buf->unpadded_size = 0; 211 buf->uncompressed_size = 0; 212 213 ++outq->bufs_in_use; 214 outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated); 215 216 return buf; 217 } 218 219 220 extern bool 221 lzma_outq_is_readable(const lzma_outq *outq) 222 { 223 if (outq->head == NULL) 224 return false; 225 226 return outq->read_pos < outq->head->pos || outq->head->finished; 227 } 228 229 230 extern lzma_ret 231 lzma_outq_read(lzma_outq *restrict outq, 232 const lzma_allocator *restrict allocator, 233 uint8_t *restrict out, size_t *restrict out_pos, 234 size_t out_size, 235 lzma_vli *restrict unpadded_size, 236 lzma_vli *restrict uncompressed_size) 237 { 238 // There must be at least one buffer from which to read. 239 if (outq->bufs_in_use == 0) 240 return LZMA_OK; 241 242 // Get the buffer. 243 lzma_outbuf *buf = outq->head; 244 245 // Copy from the buffer to output. 246 // 247 // FIXME? In threaded decoder it may be bad to do this copy while 248 // the mutex is being held. 249 lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos, 250 out, out_pos, out_size); 251 252 // Return if we didn't get all the data from the buffer. 253 if (!buf->finished || outq->read_pos < buf->pos) 254 return LZMA_OK; 255 256 // The buffer was finished. Tell the caller its size information. 257 if (unpadded_size != NULL) 258 *unpadded_size = buf->unpadded_size; 259 260 if (uncompressed_size != NULL) 261 *uncompressed_size = buf->uncompressed_size; 262 263 // Remember the return value. 264 const lzma_ret finish_ret = buf->finish_ret; 265 266 // Free this buffer for further use. 267 move_head_to_cache(outq, allocator); 268 outq->read_pos = 0; 269 270 return finish_ret; 271 } 272 273 274 extern void 275 lzma_outq_enable_partial_output(lzma_outq *outq, 276 void (*enable_partial_output)(void *worker)) 277 { 278 if (outq->head != NULL && !outq->head->finished 279 && outq->head->worker != NULL) { 280 enable_partial_output(outq->head->worker); 281 282 // Set it to NULL since calling it twice is pointless. 283 outq->head->worker = NULL; 284 } 285 286 return; 287 } 288