1 /* 2 * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. 3 * All rights reserved. 4 * 5 * This source code is licensed under both the BSD-style license (found in the 6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7 * in the COPYING file in the root directory of this source tree). 8 * You may select, at your option, one of the above-listed licenses. 9 */ 10 11 12 /* ====== Tuning parameters ====== */ 13 #define ZSTDMT_NBTHREADS_MAX 200 14 #define ZSTDMT_OVERLAPLOG_DEFAULT 6 15 16 17 /* ====== Compiler specifics ====== */ 18 #if defined(_MSC_VER) 19 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ 20 #endif 21 22 23 /* ====== Dependencies ====== */ 24 #include <string.h> /* memcpy, memset */ 25 #include "pool.h" /* threadpool */ 26 #include "threading.h" /* mutex */ 27 #include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ 28 #include "zstdmt_compress.h" 29 30 31 /* ====== Debug ====== */ 32 #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) 33 34 # include <stdio.h> 35 # include <unistd.h> 36 # include <sys/times.h> 37 # define DEBUGLOGRAW(l, ...) if (l<=ZSTD_DEBUG) { fprintf(stderr, __VA_ARGS__); } 38 39 # define DEBUG_PRINTHEX(l,p,n) { \ 40 unsigned debug_u; \ 41 for (debug_u=0; debug_u<(n); debug_u++) \ 42 DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ 43 DEBUGLOGRAW(l, " \n"); \ 44 } 45 46 static unsigned long long GetCurrentClockTimeMicroseconds(void) 47 { 48 static clock_t _ticksPerSecond = 0; 49 if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK); 50 51 { struct tms junk; clock_t newTicks = (clock_t) times(&junk); 52 return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); } 53 } 54 55 #define MUTEX_WAIT_TIME_DLEVEL 6 56 #define ZSTD_PTHREAD_MUTEX_LOCK(mutex) { \ 57 if (ZSTD_DEBUG >= MUTEX_WAIT_TIME_DLEVEL) { \ 58 unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \ 59 ZSTD_pthread_mutex_lock(mutex); \ 60 { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \ 61 unsigned long long const elapsedTime = (afterTime-beforeTime); \ 62 if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ 63 DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \ 64 elapsedTime, #mutex); \ 65 } } \ 66 } else { \ 67 ZSTD_pthread_mutex_lock(mutex); \ 68 } \ 69 } 70 71 #else 72 73 # define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m) 74 # define DEBUG_PRINTHEX(l,p,n) {} 75 76 #endif 77 78 79 /* ===== Buffer Pool ===== */ 80 /* a single Buffer Pool can be invoked from multiple threads in parallel */ 81 82 typedef struct buffer_s { 83 void* start; 84 size_t size; 85 } buffer_t; 86 87 static const buffer_t g_nullBuffer = { NULL, 0 }; 88 89 typedef struct ZSTDMT_bufferPool_s { 90 ZSTD_pthread_mutex_t poolMutex; 91 size_t bufferSize; 92 unsigned totalBuffers; 93 unsigned nbBuffers; 94 ZSTD_customMem cMem; 95 buffer_t bTable[1]; /* variable size */ 96 } ZSTDMT_bufferPool; 97 98 static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_customMem cMem) 99 { 100 unsigned const maxNbBuffers = 2*nbThreads + 3; 101 ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc( 102 sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem); 103 if (bufPool==NULL) return NULL; 104 if (ZSTD_pthread_mutex_init(&bufPool->poolMutex, NULL)) { 105 ZSTD_free(bufPool, cMem); 106 return NULL; 107 } 108 bufPool->bufferSize = 64 KB; 109 bufPool->totalBuffers = maxNbBuffers; 110 bufPool->nbBuffers = 0; 111 bufPool->cMem = cMem; 112 return bufPool; 113 } 114 115 static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) 116 { 117 unsigned u; 118 DEBUGLOG(3, "ZSTDMT_freeBufferPool (address:%08X)", (U32)(size_t)bufPool); 119 if (!bufPool) return; /* compatibility with free on NULL */ 120 for (u=0; u<bufPool->totalBuffers; u++) { 121 DEBUGLOG(4, "free buffer %2u (address:%08X)", u, (U32)(size_t)bufPool->bTable[u].start); 122 ZSTD_free(bufPool->bTable[u].start, bufPool->cMem); 123 } 124 ZSTD_pthread_mutex_destroy(&bufPool->poolMutex); 125 ZSTD_free(bufPool, bufPool->cMem); 126 } 127 128 /* only works at initialization, not during compression */ 129 static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) 130 { 131 size_t const poolSize = sizeof(*bufPool) 132 + (bufPool->totalBuffers - 1) * sizeof(buffer_t); 133 unsigned u; 134 size_t totalBufferSize = 0; 135 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); 136 for (u=0; u<bufPool->totalBuffers; u++) 137 totalBufferSize += bufPool->bTable[u].size; 138 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); 139 140 return poolSize + totalBufferSize; 141 } 142 143 static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* bufPool, size_t bSize) 144 { 145 bufPool->bufferSize = bSize; 146 } 147 148 /** ZSTDMT_getBuffer() : 149 * assumption : bufPool must be valid */ 150 static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) 151 { 152 size_t const bSize = bufPool->bufferSize; 153 DEBUGLOG(5, "ZSTDMT_getBuffer"); 154 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); 155 if (bufPool->nbBuffers) { /* try to use an existing buffer */ 156 buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)]; 157 size_t const availBufferSize = buf.size; 158 bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer; 159 if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) { 160 /* large enough, but not too much */ 161 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); 162 return buf; 163 } 164 /* size conditions not respected : scratch this buffer, create new one */ 165 DEBUGLOG(5, "existing buffer does not meet size conditions => freeing"); 166 ZSTD_free(buf.start, bufPool->cMem); 167 } 168 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); 169 /* create new buffer */ 170 DEBUGLOG(5, "create a new buffer"); 171 { buffer_t buffer; 172 void* const start = ZSTD_malloc(bSize, bufPool->cMem); 173 buffer.start = start; /* note : start can be NULL if malloc fails ! */ 174 buffer.size = (start==NULL) ? 0 : bSize; 175 return buffer; 176 } 177 } 178 179 /* store buffer for later re-use, up to pool capacity */ 180 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) 181 { 182 if (buf.start == NULL) return; /* compatible with release on NULL */ 183 DEBUGLOG(5, "ZSTDMT_releaseBuffer"); 184 ZSTD_pthread_mutex_lock(&bufPool->poolMutex); 185 if (bufPool->nbBuffers < bufPool->totalBuffers) { 186 bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */ 187 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); 188 return; 189 } 190 ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); 191 /* Reached bufferPool capacity (should not happen) */ 192 DEBUGLOG(5, "buffer pool capacity reached => freeing "); 193 ZSTD_free(buf.start, bufPool->cMem); 194 } 195 196 /* Sets parameters relevant to the compression job, initializing others to 197 * default values. Notably, nbThreads should probably be zero. */ 198 static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params) 199 { 200 ZSTD_CCtx_params jobParams; 201 memset(&jobParams, 0, sizeof(jobParams)); 202 203 jobParams.cParams = params.cParams; 204 jobParams.fParams = params.fParams; 205 jobParams.compressionLevel = params.compressionLevel; 206 207 jobParams.ldmParams = params.ldmParams; 208 return jobParams; 209 } 210 211 /* ===== CCtx Pool ===== */ 212 /* a single CCtx Pool can be invoked from multiple threads in parallel */ 213 214 typedef struct { 215 ZSTD_pthread_mutex_t poolMutex; 216 unsigned totalCCtx; 217 unsigned availCCtx; 218 ZSTD_customMem cMem; 219 ZSTD_CCtx* cctx[1]; /* variable size */ 220 } ZSTDMT_CCtxPool; 221 222 /* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */ 223 static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) 224 { 225 unsigned u; 226 for (u=0; u<pool->totalCCtx; u++) 227 ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */ 228 ZSTD_pthread_mutex_destroy(&pool->poolMutex); 229 ZSTD_free(pool, pool->cMem); 230 } 231 232 /* ZSTDMT_createCCtxPool() : 233 * implies nbThreads >= 1 , checked by caller ZSTDMT_createCCtx() */ 234 static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads, 235 ZSTD_customMem cMem) 236 { 237 ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc( 238 sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem); 239 if (!cctxPool) return NULL; 240 if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) { 241 ZSTD_free(cctxPool, cMem); 242 return NULL; 243 } 244 cctxPool->cMem = cMem; 245 cctxPool->totalCCtx = nbThreads; 246 cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ 247 cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); 248 if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } 249 DEBUGLOG(3, "cctxPool created, with %u threads", nbThreads); 250 return cctxPool; 251 } 252 253 /* only works during initialization phase, not during compression */ 254 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) 255 { 256 ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); 257 { unsigned const nbThreads = cctxPool->totalCCtx; 258 size_t const poolSize = sizeof(*cctxPool) 259 + (nbThreads-1)*sizeof(ZSTD_CCtx*); 260 unsigned u; 261 size_t totalCCtxSize = 0; 262 for (u=0; u<nbThreads; u++) { 263 totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]); 264 } 265 ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex); 266 return poolSize + totalCCtxSize; 267 } 268 } 269 270 static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool) 271 { 272 DEBUGLOG(5, "ZSTDMT_getCCtx"); 273 ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); 274 if (cctxPool->availCCtx) { 275 cctxPool->availCCtx--; 276 { ZSTD_CCtx* const cctx = cctxPool->cctx[cctxPool->availCCtx]; 277 ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex); 278 return cctx; 279 } } 280 ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex); 281 DEBUGLOG(5, "create one more CCtx"); 282 return ZSTD_createCCtx_advanced(cctxPool->cMem); /* note : can be NULL, when creation fails ! */ 283 } 284 285 static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) 286 { 287 if (cctx==NULL) return; /* compatibility with release on NULL */ 288 ZSTD_pthread_mutex_lock(&pool->poolMutex); 289 if (pool->availCCtx < pool->totalCCtx) 290 pool->cctx[pool->availCCtx++] = cctx; 291 else { 292 /* pool overflow : should not happen, since totalCCtx==nbThreads */ 293 DEBUGLOG(5, "CCtx pool overflow : free cctx"); 294 ZSTD_freeCCtx(cctx); 295 } 296 ZSTD_pthread_mutex_unlock(&pool->poolMutex); 297 } 298 299 300 /* ===== Thread worker ===== */ 301 302 typedef struct { 303 buffer_t src; 304 const void* srcStart; 305 size_t dictSize; 306 size_t srcSize; 307 buffer_t dstBuff; 308 size_t cSize; 309 size_t dstFlushed; 310 unsigned firstChunk; 311 unsigned lastChunk; 312 unsigned jobCompleted; 313 unsigned jobScanned; 314 ZSTD_pthread_mutex_t* jobCompleted_mutex; 315 ZSTD_pthread_cond_t* jobCompleted_cond; 316 ZSTD_CCtx_params params; 317 const ZSTD_CDict* cdict; 318 ZSTDMT_CCtxPool* cctxPool; 319 ZSTDMT_bufferPool* bufPool; 320 unsigned long long fullFrameSize; 321 } ZSTDMT_jobDescription; 322 323 /* ZSTDMT_compressChunk() : POOL_function type */ 324 void ZSTDMT_compressChunk(void* jobDescription) 325 { 326 ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; 327 ZSTD_CCtx* cctx = ZSTDMT_getCCtx(job->cctxPool); 328 const void* const src = (const char*)job->srcStart + job->dictSize; 329 buffer_t dstBuff = job->dstBuff; 330 DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", 331 job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); 332 333 if (cctx==NULL) { 334 job->cSize = ERROR(memory_allocation); 335 goto _endJob; 336 } 337 338 if (dstBuff.start == NULL) { 339 dstBuff = ZSTDMT_getBuffer(job->bufPool); 340 if (dstBuff.start==NULL) { 341 job->cSize = ERROR(memory_allocation); 342 goto _endJob; 343 } 344 job->dstBuff = dstBuff; 345 } 346 347 if (job->cdict) { /* should only happen for first segment */ 348 size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize); 349 DEBUGLOG(5, "using CDict"); 350 if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } 351 } else { /* srcStart points at reloaded section */ 352 if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */ 353 { ZSTD_CCtx_params jobParams = job->params; 354 size_t const forceWindowError = 355 ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); 356 /* Force loading dictionary in "content-only" mode (no header analysis) */ 357 size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->dictSize, ZSTD_dm_rawContent, jobParams, job->fullFrameSize); 358 if (ZSTD_isError(initError) || ZSTD_isError(forceWindowError)) { 359 job->cSize = initError; 360 goto _endJob; 361 } 362 } } 363 if (!job->firstChunk) { /* flush and overwrite frame header when it's not first segment */ 364 size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0); 365 if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } 366 ZSTD_invalidateRepCodes(cctx); 367 } 368 369 DEBUGLOG(5, "Compressing : "); 370 DEBUG_PRINTHEX(4, job->srcStart, 12); 371 job->cSize = (job->lastChunk) ? 372 ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : 373 ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); 374 DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)", 375 (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); 376 DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); 377 378 _endJob: 379 ZSTDMT_releaseCCtx(job->cctxPool, cctx); 380 ZSTDMT_releaseBuffer(job->bufPool, job->src); 381 job->src = g_nullBuffer; job->srcStart = NULL; 382 ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); 383 job->jobCompleted = 1; 384 job->jobScanned = 0; 385 ZSTD_pthread_cond_signal(job->jobCompleted_cond); 386 ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); 387 } 388 389 390 /* ------------------------------------------ */ 391 /* ===== Multi-threaded compression ===== */ 392 /* ------------------------------------------ */ 393 394 typedef struct { 395 buffer_t buffer; 396 size_t filled; 397 } inBuff_t; 398 399 struct ZSTDMT_CCtx_s { 400 POOL_ctx* factory; 401 ZSTDMT_jobDescription* jobs; 402 ZSTDMT_bufferPool* bufPool; 403 ZSTDMT_CCtxPool* cctxPool; 404 ZSTD_pthread_mutex_t jobCompleted_mutex; 405 ZSTD_pthread_cond_t jobCompleted_cond; 406 size_t targetSectionSize; 407 size_t inBuffSize; 408 size_t dictSize; 409 size_t targetDictSize; 410 inBuff_t inBuff; 411 ZSTD_CCtx_params params; 412 XXH64_state_t xxhState; 413 unsigned jobIDMask; 414 unsigned doneJobID; 415 unsigned nextJobID; 416 unsigned frameEnded; 417 unsigned allJobsCompleted; 418 unsigned long long frameContentSize; 419 ZSTD_customMem cMem; 420 ZSTD_CDict* cdictLocal; 421 const ZSTD_CDict* cdict; 422 }; 423 424 static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) 425 { 426 U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; 427 U32 const nbJobs = 1 << nbJobsLog2; 428 *nbJobsPtr = nbJobs; 429 return (ZSTDMT_jobDescription*) ZSTD_calloc( 430 nbJobs * sizeof(ZSTDMT_jobDescription), cMem); 431 } 432 433 /* Internal only */ 434 size_t ZSTDMT_initializeCCtxParameters(ZSTD_CCtx_params* params, unsigned nbThreads) 435 { 436 params->nbThreads = nbThreads; 437 params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT; 438 params->jobSize = 0; 439 return 0; 440 } 441 442 ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) 443 { 444 ZSTDMT_CCtx* mtctx; 445 U32 nbJobs = nbThreads + 2; 446 DEBUGLOG(3, "ZSTDMT_createCCtx_advanced"); 447 448 if (nbThreads < 1) return NULL; 449 nbThreads = MIN(nbThreads , ZSTDMT_NBTHREADS_MAX); 450 if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL)) 451 /* invalid custom allocator */ 452 return NULL; 453 454 mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem); 455 if (!mtctx) return NULL; 456 ZSTDMT_initializeCCtxParameters(&mtctx->params, nbThreads); 457 mtctx->cMem = cMem; 458 mtctx->allJobsCompleted = 1; 459 mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); 460 mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); 461 mtctx->jobIDMask = nbJobs - 1; 462 mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); 463 mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); 464 if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) { 465 ZSTDMT_freeCCtx(mtctx); 466 return NULL; 467 } 468 if (ZSTD_pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) { 469 ZSTDMT_freeCCtx(mtctx); 470 return NULL; 471 } 472 if (ZSTD_pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) { 473 ZSTDMT_freeCCtx(mtctx); 474 return NULL; 475 } 476 DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads); 477 return mtctx; 478 } 479 480 ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads) 481 { 482 return ZSTDMT_createCCtx_advanced(nbThreads, ZSTD_defaultCMem); 483 } 484 485 /* ZSTDMT_releaseAllJobResources() : 486 * note : ensure all workers are killed first ! */ 487 static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) 488 { 489 unsigned jobID; 490 DEBUGLOG(3, "ZSTDMT_releaseAllJobResources"); 491 for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { 492 DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); 493 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); 494 mtctx->jobs[jobID].dstBuff = g_nullBuffer; 495 DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].src.start); 496 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src); 497 mtctx->jobs[jobID].src = g_nullBuffer; 498 } 499 memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); 500 DEBUGLOG(4, "input: release address %08X", (U32)(size_t)mtctx->inBuff.buffer.start); 501 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer); 502 mtctx->inBuff.buffer = g_nullBuffer; 503 mtctx->allJobsCompleted = 1; 504 } 505 506 static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) 507 { 508 DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); 509 while (zcs->doneJobID < zcs->nextJobID) { 510 unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; 511 ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); 512 while (zcs->jobs[jobID].jobCompleted==0) { 513 DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ 514 ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); 515 } 516 ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); 517 zcs->doneJobID++; 518 } 519 } 520 521 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) 522 { 523 if (mtctx==NULL) return 0; /* compatible with free on NULL */ 524 POOL_free(mtctx->factory); /* stop and free worker threads */ 525 ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */ 526 ZSTD_free(mtctx->jobs, mtctx->cMem); 527 ZSTDMT_freeBufferPool(mtctx->bufPool); 528 ZSTDMT_freeCCtxPool(mtctx->cctxPool); 529 ZSTD_freeCDict(mtctx->cdictLocal); 530 ZSTD_pthread_mutex_destroy(&mtctx->jobCompleted_mutex); 531 ZSTD_pthread_cond_destroy(&mtctx->jobCompleted_cond); 532 ZSTD_free(mtctx, mtctx->cMem); 533 return 0; 534 } 535 536 size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx) 537 { 538 if (mtctx == NULL) return 0; /* supports sizeof NULL */ 539 return sizeof(*mtctx) 540 + POOL_sizeof(mtctx->factory) 541 + ZSTDMT_sizeof_bufferPool(mtctx->bufPool) 542 + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription) 543 + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool) 544 + ZSTD_sizeof_CDict(mtctx->cdictLocal); 545 } 546 547 /* Internal only */ 548 size_t ZSTDMT_CCtxParam_setMTCtxParameter( 549 ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value) { 550 switch(parameter) 551 { 552 case ZSTDMT_p_sectionSize : 553 params->jobSize = value; 554 return 0; 555 case ZSTDMT_p_overlapSectionLog : 556 DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value); 557 params->overlapSizeLog = (value >= 9) ? 9 : value; 558 return 0; 559 default : 560 return ERROR(parameter_unsupported); 561 } 562 } 563 564 size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value) 565 { 566 switch(parameter) 567 { 568 case ZSTDMT_p_sectionSize : 569 return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); 570 case ZSTDMT_p_overlapSectionLog : 571 return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); 572 default : 573 return ERROR(parameter_unsupported); 574 } 575 } 576 577 /* ------------------------------------------ */ 578 /* ===== Multi-threaded compression ===== */ 579 /* ------------------------------------------ */ 580 581 static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) { 582 size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2); 583 size_t const chunkMaxSize = chunkSizeTarget << 2; 584 size_t const passSizeMax = chunkMaxSize * nbThreads; 585 unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; 586 unsigned const nbChunksLarge = multiplier * nbThreads; 587 unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; 588 unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads); 589 return (multiplier>1) ? nbChunksLarge : nbChunksSmall; 590 } 591 592 static size_t ZSTDMT_compress_advanced_internal( 593 ZSTDMT_CCtx* mtctx, 594 void* dst, size_t dstCapacity, 595 const void* src, size_t srcSize, 596 const ZSTD_CDict* cdict, 597 ZSTD_CCtx_params const params) 598 { 599 ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(params); 600 unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; 601 size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); 602 unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads); 603 size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; 604 size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ 605 const char* const srcStart = (const char*)src; 606 size_t remainingSrcSize = srcSize; 607 unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ 608 size_t frameStartPos = 0, dstBufferPos = 0; 609 XXH64_state_t xxh64; 610 assert(jobParams.nbThreads == 0); 611 assert(mtctx->cctxPool->totalCCtx == params.nbThreads); 612 613 DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); 614 if (nbChunks==1) { /* fallback to single-thread mode */ 615 ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; 616 if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); 617 return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); 618 } 619 assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is required for compressWithinDst */ 620 ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) ); 621 XXH64_reset(&xxh64, 0); 622 623 if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */ 624 U32 nbJobs = nbChunks; 625 ZSTD_free(mtctx->jobs, mtctx->cMem); 626 mtctx->jobIDMask = 0; 627 mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem); 628 if (mtctx->jobs==NULL) return ERROR(memory_allocation); 629 mtctx->jobIDMask = nbJobs - 1; 630 } 631 632 { unsigned u; 633 for (u=0; u<nbChunks; u++) { 634 size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize); 635 size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize); 636 buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity }; 637 buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer; 638 size_t dictSize = u ? overlapSize : 0; 639 640 mtctx->jobs[u].src = g_nullBuffer; 641 mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; 642 mtctx->jobs[u].dictSize = dictSize; 643 mtctx->jobs[u].srcSize = chunkSize; 644 mtctx->jobs[u].cdict = mtctx->nextJobID==0 ? cdict : NULL; 645 mtctx->jobs[u].fullFrameSize = srcSize; 646 mtctx->jobs[u].params = jobParams; 647 /* do not calculate checksum within sections, but write it in header for first section */ 648 if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0; 649 mtctx->jobs[u].dstBuff = dstBuffer; 650 mtctx->jobs[u].cctxPool = mtctx->cctxPool; 651 mtctx->jobs[u].bufPool = mtctx->bufPool; 652 mtctx->jobs[u].firstChunk = (u==0); 653 mtctx->jobs[u].lastChunk = (u==nbChunks-1); 654 mtctx->jobs[u].jobCompleted = 0; 655 mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; 656 mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; 657 658 if (params.fParams.checksumFlag) { 659 XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize); 660 } 661 662 DEBUGLOG(5, "posting job %u (%u bytes)", u, (U32)chunkSize); 663 DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12); 664 POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); 665 666 frameStartPos += chunkSize; 667 dstBufferPos += dstBufferCapacity; 668 remainingSrcSize -= chunkSize; 669 } } 670 671 /* collect result */ 672 { size_t error = 0, dstPos = 0; 673 unsigned chunkID; 674 for (chunkID=0; chunkID<nbChunks; chunkID++) { 675 DEBUGLOG(5, "waiting for chunk %u ", chunkID); 676 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); 677 while (mtctx->jobs[chunkID].jobCompleted==0) { 678 DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID); 679 ZSTD_pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); 680 } 681 ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex); 682 DEBUGLOG(5, "ready to write chunk %u ", chunkID); 683 684 mtctx->jobs[chunkID].srcStart = NULL; 685 { size_t const cSize = mtctx->jobs[chunkID].cSize; 686 if (ZSTD_isError(cSize)) error = cSize; 687 if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); 688 if (chunkID) { /* note : chunk 0 is written directly at dst, which is correct position */ 689 if (!error) 690 memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */ 691 if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */ 692 DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst); 693 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff); 694 } } 695 mtctx->jobs[chunkID].dstBuff = g_nullBuffer; 696 dstPos += cSize ; 697 } 698 } /* for (chunkID=0; chunkID<nbChunks; chunkID++) */ 699 700 DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag); 701 if (params.fParams.checksumFlag) { 702 U32 const checksum = (U32)XXH64_digest(&xxh64); 703 if (dstPos + 4 > dstCapacity) { 704 error = ERROR(dstSize_tooSmall); 705 } else { 706 DEBUGLOG(4, "writing checksum : %08X \n", checksum); 707 MEM_writeLE32((char*)dst + dstPos, checksum); 708 dstPos += 4; 709 } } 710 711 if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); 712 return error ? error : dstPos; 713 } 714 } 715 716 size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, 717 void* dst, size_t dstCapacity, 718 const void* src, size_t srcSize, 719 const ZSTD_CDict* cdict, 720 ZSTD_parameters const params, 721 unsigned overlapLog) 722 { 723 ZSTD_CCtx_params cctxParams = mtctx->params; 724 cctxParams.cParams = params.cParams; 725 cctxParams.fParams = params.fParams; 726 cctxParams.overlapSizeLog = overlapLog; 727 return ZSTDMT_compress_advanced_internal(mtctx, 728 dst, dstCapacity, 729 src, srcSize, 730 cdict, cctxParams); 731 } 732 733 734 size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, 735 void* dst, size_t dstCapacity, 736 const void* src, size_t srcSize, 737 int compressionLevel) 738 { 739 U32 const overlapLog = (compressionLevel >= ZSTD_maxCLevel()) ? 9 : ZSTDMT_OVERLAPLOG_DEFAULT; 740 ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); 741 params.fParams.contentSizeFlag = 1; 742 return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapLog); 743 } 744 745 746 /* ====================================== */ 747 /* ======= Streaming API ======= */ 748 /* ====================================== */ 749 750 size_t ZSTDMT_initCStream_internal( 751 ZSTDMT_CCtx* zcs, 752 const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode, 753 const ZSTD_CDict* cdict, ZSTD_CCtx_params params, 754 unsigned long long pledgedSrcSize) 755 { 756 DEBUGLOG(4, "ZSTDMT_initCStream_internal"); 757 /* params are supposed to be fully validated at this point */ 758 assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); 759 assert(!((dict) && (cdict))); /* either dict or cdict, not both */ 760 assert(zcs->cctxPool->totalCCtx == params.nbThreads); 761 762 if (params.nbThreads==1) { 763 ZSTD_CCtx_params const singleThreadParams = ZSTDMT_makeJobCCtxParams(params); 764 DEBUGLOG(4, "single thread mode"); 765 assert(singleThreadParams.nbThreads == 0); 766 return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], 767 dict, dictSize, cdict, 768 singleThreadParams, pledgedSrcSize); 769 } 770 771 if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ 772 ZSTDMT_waitForAllJobsCompleted(zcs); 773 ZSTDMT_releaseAllJobResources(zcs); 774 zcs->allJobsCompleted = 1; 775 } 776 777 zcs->params = params; 778 zcs->frameContentSize = pledgedSrcSize; 779 if (dict) { 780 DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal); 781 ZSTD_freeCDict(zcs->cdictLocal); 782 zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, 783 ZSTD_dlm_byCopy, dictMode, /* note : a loadPrefix becomes an internal CDict */ 784 params.cParams, zcs->cMem); 785 zcs->cdict = zcs->cdictLocal; 786 if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); 787 } else { 788 DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal); 789 ZSTD_freeCDict(zcs->cdictLocal); 790 zcs->cdictLocal = NULL; 791 zcs->cdict = cdict; 792 } 793 794 zcs->targetDictSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); 795 DEBUGLOG(4, "overlapLog : %u ", params.overlapSizeLog); 796 DEBUGLOG(4, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); 797 zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2); 798 zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); 799 zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); 800 DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); 801 zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize; 802 ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) ); 803 zcs->inBuff.buffer = g_nullBuffer; 804 zcs->dictSize = 0; 805 zcs->doneJobID = 0; 806 zcs->nextJobID = 0; 807 zcs->frameEnded = 0; 808 zcs->allJobsCompleted = 0; 809 if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); 810 return 0; 811 } 812 813 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, 814 const void* dict, size_t dictSize, 815 ZSTD_parameters params, 816 unsigned long long pledgedSrcSize) 817 { 818 ZSTD_CCtx_params cctxParams = mtctx->params; 819 DEBUGLOG(5, "ZSTDMT_initCStream_advanced"); 820 cctxParams.cParams = params.cParams; 821 cctxParams.fParams = params.fParams; 822 return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, ZSTD_dm_auto, NULL, 823 cctxParams, pledgedSrcSize); 824 } 825 826 size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, 827 const ZSTD_CDict* cdict, 828 ZSTD_frameParameters fParams, 829 unsigned long long pledgedSrcSize) 830 { 831 ZSTD_CCtx_params cctxParams = mtctx->params; 832 cctxParams.cParams = ZSTD_getCParamsFromCDict(cdict); 833 cctxParams.fParams = fParams; 834 if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */ 835 return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, ZSTD_dm_auto, cdict, 836 cctxParams, pledgedSrcSize); 837 } 838 839 840 /* ZSTDMT_resetCStream() : 841 * pledgedSrcSize is optional and can be zero == unknown */ 842 size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) 843 { 844 if (zcs->params.nbThreads==1) 845 return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize); 846 return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, 0, zcs->params, 847 pledgedSrcSize); 848 } 849 850 size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { 851 ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); 852 ZSTD_CCtx_params cctxParams = zcs->params; 853 cctxParams.cParams = params.cParams; 854 cctxParams.fParams = params.fParams; 855 return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, 0); 856 } 857 858 859 static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame) 860 { 861 unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; 862 863 DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", 864 zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); 865 zcs->jobs[jobID].src = zcs->inBuff.buffer; 866 zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; 867 zcs->jobs[jobID].srcSize = srcSize; 868 zcs->jobs[jobID].dictSize = zcs->dictSize; 869 assert(zcs->inBuff.filled >= srcSize + zcs->dictSize); 870 zcs->jobs[jobID].params = zcs->params; 871 /* do not calculate checksum within sections, but write it in header for first section */ 872 if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; 873 zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; 874 zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; 875 zcs->jobs[jobID].dstBuff = g_nullBuffer; 876 zcs->jobs[jobID].cctxPool = zcs->cctxPool; 877 zcs->jobs[jobID].bufPool = zcs->bufPool; 878 zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); 879 zcs->jobs[jobID].lastChunk = endFrame; 880 zcs->jobs[jobID].jobCompleted = 0; 881 zcs->jobs[jobID].dstFlushed = 0; 882 zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; 883 zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; 884 885 if (zcs->params.fParams.checksumFlag) 886 XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->dictSize, srcSize); 887 888 /* get a new buffer for next input */ 889 if (!endFrame) { 890 size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); 891 zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool); 892 if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ 893 zcs->jobs[jobID].jobCompleted = 1; 894 zcs->nextJobID++; 895 ZSTDMT_waitForAllJobsCompleted(zcs); 896 ZSTDMT_releaseAllJobResources(zcs); 897 return ERROR(memory_allocation); 898 } 899 zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; 900 memmove(zcs->inBuff.buffer.start, 901 (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, 902 zcs->inBuff.filled); 903 zcs->dictSize = newDictSize; 904 } else { /* if (endFrame==1) */ 905 zcs->inBuff.buffer = g_nullBuffer; 906 zcs->inBuff.filled = 0; 907 zcs->dictSize = 0; 908 zcs->frameEnded = 1; 909 if (zcs->nextJobID == 0) { 910 /* single chunk exception : checksum is calculated directly within worker thread */ 911 zcs->params.fParams.checksumFlag = 0; 912 } } 913 914 DEBUGLOG(4, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", 915 zcs->nextJobID, 916 (U32)zcs->jobs[jobID].srcSize, 917 zcs->jobs[jobID].lastChunk, 918 zcs->doneJobID, 919 zcs->doneJobID & zcs->jobIDMask); 920 POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ 921 zcs->nextJobID++; 922 return 0; 923 } 924 925 926 /* ZSTDMT_flushNextJob() : 927 * output : will be updated with amount of data flushed . 928 * blockToFlush : if >0, the function will block and wait if there is no data available to flush . 929 * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */ 930 static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) 931 { 932 unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; 933 if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ 934 ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); 935 while (zcs->jobs[wJobID].jobCompleted==0) { 936 DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); 937 if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ 938 ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */ 939 } 940 ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); 941 /* compression job completed : output can be flushed */ 942 { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; 943 if (!job.jobScanned) { 944 if (ZSTD_isError(job.cSize)) { 945 DEBUGLOG(5, "compression error detected "); 946 ZSTDMT_waitForAllJobsCompleted(zcs); 947 ZSTDMT_releaseAllJobResources(zcs); 948 return job.cSize; 949 } 950 DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); 951 if (zcs->params.fParams.checksumFlag) { 952 if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ 953 U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); 954 DEBUGLOG(5, "writing checksum : %08X \n", checksum); 955 MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); 956 job.cSize += 4; 957 zcs->jobs[wJobID].cSize += 4; 958 } } 959 zcs->jobs[wJobID].jobScanned = 1; 960 } 961 { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); 962 DEBUGLOG(5, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); 963 memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); 964 output->pos += toWrite; 965 job.dstFlushed += toWrite; 966 } 967 if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ 968 ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); 969 zcs->jobs[wJobID].dstBuff = g_nullBuffer; 970 zcs->jobs[wJobID].jobCompleted = 0; 971 zcs->doneJobID++; 972 } else { 973 zcs->jobs[wJobID].dstFlushed = job.dstFlushed; 974 } 975 /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ 976 if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); 977 if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */ 978 zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */ 979 return 0; /* everything flushed */ 980 } } 981 982 983 /** ZSTDMT_compressStream_generic() : 984 * internal use only - exposed to be invoked from zstd_compress.c 985 * assumption : output and input are valid (pos <= size) 986 * @return : minimum amount of data remaining to flush, 0 if none */ 987 size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, 988 ZSTD_outBuffer* output, 989 ZSTD_inBuffer* input, 990 ZSTD_EndDirective endOp) 991 { 992 size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize; 993 unsigned forwardInputProgress = 0; 994 assert(output->pos <= output->size); 995 assert(input->pos <= input->size); 996 if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) { 997 /* current frame being ended. Only flush/end are allowed */ 998 return ERROR(stage_wrong); 999 } 1000 if (mtctx->params.nbThreads==1) { /* delegate to single-thread (synchronous) */ 1001 return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp); 1002 } 1003 1004 /* single-pass shortcut (note : synchronous-mode) */ 1005 if ( (mtctx->nextJobID == 0) /* just started */ 1006 && (mtctx->inBuff.filled == 0) /* nothing buffered */ 1007 && (endOp == ZSTD_e_end) /* end order */ 1008 && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */ 1009 size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, 1010 (char*)output->dst + output->pos, output->size - output->pos, 1011 (const char*)input->src + input->pos, input->size - input->pos, 1012 mtctx->cdict, mtctx->params); 1013 if (ZSTD_isError(cSize)) return cSize; 1014 input->pos = input->size; 1015 output->pos += cSize; 1016 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer); /* was allocated in initStream */ 1017 mtctx->allJobsCompleted = 1; 1018 mtctx->frameEnded = 1; 1019 return 0; 1020 } 1021 1022 /* fill input buffer */ 1023 if (input->size > input->pos) { /* support NULL input */ 1024 if (mtctx->inBuff.buffer.start == NULL) { 1025 mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : may fail, in which case, no forward input progress */ 1026 mtctx->inBuff.filled = 0; 1027 } 1028 if (mtctx->inBuff.buffer.start) { 1029 size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); 1030 DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad); 1031 memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); 1032 input->pos += toLoad; 1033 mtctx->inBuff.filled += toLoad; 1034 forwardInputProgress = toLoad>0; 1035 } } 1036 1037 if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ 1038 && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { /* avoid overwriting job round buffer */ 1039 CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) ); 1040 } 1041 1042 /* check for potential compressed data ready to be flushed */ 1043 CHECK_F( ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress /* blockToFlush */) ); /* block if there was no forward input progress */ 1044 1045 if (input->pos < input->size) /* input not consumed : do not flush yet */ 1046 endOp = ZSTD_e_continue; 1047 1048 switch(endOp) 1049 { 1050 case ZSTD_e_flush: 1051 return ZSTDMT_flushStream(mtctx, output); 1052 case ZSTD_e_end: 1053 return ZSTDMT_endStream(mtctx, output); 1054 case ZSTD_e_continue: 1055 return 1; 1056 default: 1057 return ERROR(GENERIC); /* invalid endDirective */ 1058 } 1059 } 1060 1061 1062 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) 1063 { 1064 CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) ); 1065 1066 /* recommended next input size : fill current input buffer */ 1067 return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ 1068 } 1069 1070 1071 static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame) 1072 { 1073 size_t const srcSize = zcs->inBuff.filled - zcs->dictSize; 1074 1075 if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) 1076 && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { 1077 CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); 1078 } 1079 1080 /* check if there is any data available to flush */ 1081 return ZSTDMT_flushNextJob(zcs, output, 1 /* blockToFlush */); 1082 } 1083 1084 1085 size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) 1086 { 1087 DEBUGLOG(5, "ZSTDMT_flushStream"); 1088 if (zcs->params.nbThreads==1) 1089 return ZSTD_flushStream(zcs->cctxPool->cctx[0], output); 1090 return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */); 1091 } 1092 1093 size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) 1094 { 1095 DEBUGLOG(4, "ZSTDMT_endStream"); 1096 if (zcs->params.nbThreads==1) 1097 return ZSTD_endStream(zcs->cctxPool->cctx[0], output); 1098 return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */); 1099 } 1100