10c16b537SWarner Losh /* 20c16b537SWarner Losh * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. 30c16b537SWarner Losh * All rights reserved. 40c16b537SWarner Losh * 50c16b537SWarner Losh * This source code is licensed under both the BSD-style license (found in the 60c16b537SWarner Losh * LICENSE file in the root directory of this source tree) and the GPLv2 (found 70c16b537SWarner Losh * in the COPYING file in the root directory of this source tree). 80c16b537SWarner Losh * You may select, at your option, one of the above-listed licenses. 90c16b537SWarner Losh */ 100c16b537SWarner Losh 110c16b537SWarner Losh 120c16b537SWarner Losh /* ====== Dependencies ======= */ 130c16b537SWarner Losh #include <stddef.h> /* size_t */ 140c16b537SWarner Losh #include "pool.h" 15*19fcbaf1SConrad Meyer #include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */ 160c16b537SWarner Losh 170c16b537SWarner Losh /* ====== Compiler specifics ====== */ 180c16b537SWarner Losh #if defined(_MSC_VER) 190c16b537SWarner Losh # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ 200c16b537SWarner Losh #endif 210c16b537SWarner Losh 220c16b537SWarner Losh 230c16b537SWarner Losh #ifdef ZSTD_MULTITHREAD 240c16b537SWarner Losh 250c16b537SWarner Losh #include "threading.h" /* pthread adaptation */ 260c16b537SWarner Losh 270c16b537SWarner Losh /* A job is a function and an opaque argument */ 280c16b537SWarner Losh typedef struct POOL_job_s { 290c16b537SWarner Losh POOL_function function; 300c16b537SWarner Losh void *opaque; 310c16b537SWarner Losh } POOL_job; 320c16b537SWarner Losh 330c16b537SWarner Losh struct POOL_ctx_s { 340c16b537SWarner Losh ZSTD_customMem customMem; 350c16b537SWarner Losh /* Keep track of the threads */ 360c16b537SWarner Losh ZSTD_pthread_t *threads; 370c16b537SWarner Losh size_t numThreads; 380c16b537SWarner Losh 390c16b537SWarner Losh /* The queue is a circular buffer */ 400c16b537SWarner Losh POOL_job *queue; 410c16b537SWarner Losh size_t queueHead; 420c16b537SWarner Losh size_t queueTail; 430c16b537SWarner Losh size_t queueSize; 440c16b537SWarner Losh 450c16b537SWarner Losh /* The number of threads working on jobs */ 460c16b537SWarner Losh size_t numThreadsBusy; 470c16b537SWarner Losh /* Indicates if the queue is empty */ 480c16b537SWarner Losh int queueEmpty; 490c16b537SWarner Losh 500c16b537SWarner Losh /* The mutex protects the queue */ 510c16b537SWarner Losh ZSTD_pthread_mutex_t queueMutex; 520c16b537SWarner Losh /* Condition variable for pushers to wait on when the queue is full */ 530c16b537SWarner Losh ZSTD_pthread_cond_t queuePushCond; 540c16b537SWarner Losh /* Condition variables for poppers to wait on when the queue is empty */ 550c16b537SWarner Losh ZSTD_pthread_cond_t queuePopCond; 560c16b537SWarner Losh /* Indicates if the queue is shutting down */ 570c16b537SWarner Losh int shutdown; 580c16b537SWarner Losh }; 590c16b537SWarner Losh 600c16b537SWarner Losh /* POOL_thread() : 610c16b537SWarner Losh Work thread for the thread pool. 620c16b537SWarner Losh Waits for jobs and executes them. 630c16b537SWarner Losh @returns : NULL on failure else non-null. 640c16b537SWarner Losh */ 650c16b537SWarner Losh static void* POOL_thread(void* opaque) { 660c16b537SWarner Losh POOL_ctx* const ctx = (POOL_ctx*)opaque; 670c16b537SWarner Losh if (!ctx) { return NULL; } 680c16b537SWarner Losh for (;;) { 690c16b537SWarner Losh /* Lock the mutex and wait for a non-empty queue or until shutdown */ 700c16b537SWarner Losh ZSTD_pthread_mutex_lock(&ctx->queueMutex); 710c16b537SWarner Losh 720c16b537SWarner Losh while (ctx->queueEmpty && !ctx->shutdown) { 730c16b537SWarner Losh ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); 740c16b537SWarner Losh } 750c16b537SWarner Losh /* empty => shutting down: so stop */ 760c16b537SWarner Losh if (ctx->queueEmpty) { 770c16b537SWarner Losh ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 780c16b537SWarner Losh return opaque; 790c16b537SWarner Losh } 800c16b537SWarner Losh /* Pop a job off the queue */ 810c16b537SWarner Losh { POOL_job const job = ctx->queue[ctx->queueHead]; 820c16b537SWarner Losh ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; 830c16b537SWarner Losh ctx->numThreadsBusy++; 840c16b537SWarner Losh ctx->queueEmpty = ctx->queueHead == ctx->queueTail; 850c16b537SWarner Losh /* Unlock the mutex, signal a pusher, and run the job */ 860c16b537SWarner Losh ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 870c16b537SWarner Losh ZSTD_pthread_cond_signal(&ctx->queuePushCond); 880c16b537SWarner Losh 890c16b537SWarner Losh job.function(job.opaque); 900c16b537SWarner Losh 910c16b537SWarner Losh /* If the intended queue size was 0, signal after finishing job */ 920c16b537SWarner Losh if (ctx->queueSize == 1) { 930c16b537SWarner Losh ZSTD_pthread_mutex_lock(&ctx->queueMutex); 940c16b537SWarner Losh ctx->numThreadsBusy--; 950c16b537SWarner Losh ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 960c16b537SWarner Losh ZSTD_pthread_cond_signal(&ctx->queuePushCond); 970c16b537SWarner Losh } } 980c16b537SWarner Losh } /* for (;;) */ 990c16b537SWarner Losh /* Unreachable */ 1000c16b537SWarner Losh } 1010c16b537SWarner Losh 1020c16b537SWarner Losh POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { 1030c16b537SWarner Losh return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); 1040c16b537SWarner Losh } 1050c16b537SWarner Losh 1060c16b537SWarner Losh POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { 1070c16b537SWarner Losh POOL_ctx* ctx; 1080c16b537SWarner Losh /* Check the parameters */ 1090c16b537SWarner Losh if (!numThreads) { return NULL; } 1100c16b537SWarner Losh /* Allocate the context and zero initialize */ 1110c16b537SWarner Losh ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem); 1120c16b537SWarner Losh if (!ctx) { return NULL; } 1130c16b537SWarner Losh /* Initialize the job queue. 1140c16b537SWarner Losh * It needs one extra space since one space is wasted to differentiate empty 1150c16b537SWarner Losh * and full queues. 1160c16b537SWarner Losh */ 1170c16b537SWarner Losh ctx->queueSize = queueSize + 1; 118052d3c12SConrad Meyer ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem); 1190c16b537SWarner Losh ctx->queueHead = 0; 1200c16b537SWarner Losh ctx->queueTail = 0; 1210c16b537SWarner Losh ctx->numThreadsBusy = 0; 1220c16b537SWarner Losh ctx->queueEmpty = 1; 1230c16b537SWarner Losh (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL); 1240c16b537SWarner Losh (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL); 1250c16b537SWarner Losh (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); 1260c16b537SWarner Losh ctx->shutdown = 0; 1270c16b537SWarner Losh /* Allocate space for the thread handles */ 1280c16b537SWarner Losh ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem); 1290c16b537SWarner Losh ctx->numThreads = 0; 1300c16b537SWarner Losh ctx->customMem = customMem; 1310c16b537SWarner Losh /* Check for errors */ 1320c16b537SWarner Losh if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } 1330c16b537SWarner Losh /* Initialize the threads */ 1340c16b537SWarner Losh { size_t i; 1350c16b537SWarner Losh for (i = 0; i < numThreads; ++i) { 1360c16b537SWarner Losh if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { 1370c16b537SWarner Losh ctx->numThreads = i; 1380c16b537SWarner Losh POOL_free(ctx); 1390c16b537SWarner Losh return NULL; 1400c16b537SWarner Losh } } 1410c16b537SWarner Losh ctx->numThreads = numThreads; 1420c16b537SWarner Losh } 1430c16b537SWarner Losh return ctx; 1440c16b537SWarner Losh } 1450c16b537SWarner Losh 1460c16b537SWarner Losh /*! POOL_join() : 1470c16b537SWarner Losh Shutdown the queue, wake any sleeping threads, and join all of the threads. 1480c16b537SWarner Losh */ 1490c16b537SWarner Losh static void POOL_join(POOL_ctx* ctx) { 1500c16b537SWarner Losh /* Shut down the queue */ 1510c16b537SWarner Losh ZSTD_pthread_mutex_lock(&ctx->queueMutex); 1520c16b537SWarner Losh ctx->shutdown = 1; 1530c16b537SWarner Losh ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 1540c16b537SWarner Losh /* Wake up sleeping threads */ 1550c16b537SWarner Losh ZSTD_pthread_cond_broadcast(&ctx->queuePushCond); 1560c16b537SWarner Losh ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); 1570c16b537SWarner Losh /* Join all of the threads */ 1580c16b537SWarner Losh { size_t i; 1590c16b537SWarner Losh for (i = 0; i < ctx->numThreads; ++i) { 1600c16b537SWarner Losh ZSTD_pthread_join(ctx->threads[i], NULL); 1610c16b537SWarner Losh } } 1620c16b537SWarner Losh } 1630c16b537SWarner Losh 1640c16b537SWarner Losh void POOL_free(POOL_ctx *ctx) { 1650c16b537SWarner Losh if (!ctx) { return; } 1660c16b537SWarner Losh POOL_join(ctx); 1670c16b537SWarner Losh ZSTD_pthread_mutex_destroy(&ctx->queueMutex); 1680c16b537SWarner Losh ZSTD_pthread_cond_destroy(&ctx->queuePushCond); 1690c16b537SWarner Losh ZSTD_pthread_cond_destroy(&ctx->queuePopCond); 1700c16b537SWarner Losh ZSTD_free(ctx->queue, ctx->customMem); 1710c16b537SWarner Losh ZSTD_free(ctx->threads, ctx->customMem); 1720c16b537SWarner Losh ZSTD_free(ctx, ctx->customMem); 1730c16b537SWarner Losh } 1740c16b537SWarner Losh 1750c16b537SWarner Losh size_t POOL_sizeof(POOL_ctx *ctx) { 1760c16b537SWarner Losh if (ctx==NULL) return 0; /* supports sizeof NULL */ 1770c16b537SWarner Losh return sizeof(*ctx) 1780c16b537SWarner Losh + ctx->queueSize * sizeof(POOL_job) 1790c16b537SWarner Losh + ctx->numThreads * sizeof(ZSTD_pthread_t); 1800c16b537SWarner Losh } 1810c16b537SWarner Losh 1820c16b537SWarner Losh /** 1830c16b537SWarner Losh * Returns 1 if the queue is full and 0 otherwise. 1840c16b537SWarner Losh * 1850c16b537SWarner Losh * If the queueSize is 1 (the pool was created with an intended queueSize of 0), 1860c16b537SWarner Losh * then a queue is empty if there is a thread free and no job is waiting. 1870c16b537SWarner Losh */ 1880c16b537SWarner Losh static int isQueueFull(POOL_ctx const* ctx) { 1890c16b537SWarner Losh if (ctx->queueSize > 1) { 1900c16b537SWarner Losh return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); 1910c16b537SWarner Losh } else { 1920c16b537SWarner Losh return ctx->numThreadsBusy == ctx->numThreads || 1930c16b537SWarner Losh !ctx->queueEmpty; 1940c16b537SWarner Losh } 1950c16b537SWarner Losh } 1960c16b537SWarner Losh 1970c16b537SWarner Losh 198*19fcbaf1SConrad Meyer static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) 199*19fcbaf1SConrad Meyer { 200*19fcbaf1SConrad Meyer POOL_job const job = {function, opaque}; 201*19fcbaf1SConrad Meyer assert(ctx != NULL); 202*19fcbaf1SConrad Meyer if (ctx->shutdown) return; 2030c16b537SWarner Losh 2040c16b537SWarner Losh ctx->queueEmpty = 0; 2050c16b537SWarner Losh ctx->queue[ctx->queueTail] = job; 2060c16b537SWarner Losh ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; 2070c16b537SWarner Losh ZSTD_pthread_cond_signal(&ctx->queuePopCond); 2080c16b537SWarner Losh } 2090c16b537SWarner Losh 210*19fcbaf1SConrad Meyer void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) 211*19fcbaf1SConrad Meyer { 212*19fcbaf1SConrad Meyer assert(ctx != NULL); 213*19fcbaf1SConrad Meyer ZSTD_pthread_mutex_lock(&ctx->queueMutex); 214*19fcbaf1SConrad Meyer /* Wait until there is space in the queue for the new job */ 215*19fcbaf1SConrad Meyer while (isQueueFull(ctx) && (!ctx->shutdown)) { 216*19fcbaf1SConrad Meyer ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); 217*19fcbaf1SConrad Meyer } 218*19fcbaf1SConrad Meyer POOL_add_internal(ctx, function, opaque); 219*19fcbaf1SConrad Meyer ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 220*19fcbaf1SConrad Meyer } 2210c16b537SWarner Losh 222*19fcbaf1SConrad Meyer 223*19fcbaf1SConrad Meyer int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) 224*19fcbaf1SConrad Meyer { 225*19fcbaf1SConrad Meyer assert(ctx != NULL); 226*19fcbaf1SConrad Meyer ZSTD_pthread_mutex_lock(&ctx->queueMutex); 227*19fcbaf1SConrad Meyer if (isQueueFull(ctx)) { 228*19fcbaf1SConrad Meyer ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 229*19fcbaf1SConrad Meyer return 0; 230*19fcbaf1SConrad Meyer } 231*19fcbaf1SConrad Meyer POOL_add_internal(ctx, function, opaque); 232*19fcbaf1SConrad Meyer ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 233*19fcbaf1SConrad Meyer return 1; 234*19fcbaf1SConrad Meyer } 235*19fcbaf1SConrad Meyer 236*19fcbaf1SConrad Meyer 237*19fcbaf1SConrad Meyer #else /* ZSTD_MULTITHREAD not defined */ 238*19fcbaf1SConrad Meyer 239*19fcbaf1SConrad Meyer /* ========================== */ 240*19fcbaf1SConrad Meyer /* No multi-threading support */ 241*19fcbaf1SConrad Meyer /* ========================== */ 242*19fcbaf1SConrad Meyer 243*19fcbaf1SConrad Meyer 244*19fcbaf1SConrad Meyer /* We don't need any data, but if it is empty, malloc() might return NULL. */ 2450c16b537SWarner Losh struct POOL_ctx_s { 2460c16b537SWarner Losh int dummy; 2470c16b537SWarner Losh }; 2480c16b537SWarner Losh static POOL_ctx g_ctx; 2490c16b537SWarner Losh 2500c16b537SWarner Losh POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { 2510c16b537SWarner Losh return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); 2520c16b537SWarner Losh } 2530c16b537SWarner Losh 2540c16b537SWarner Losh POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { 2550c16b537SWarner Losh (void)numThreads; 2560c16b537SWarner Losh (void)queueSize; 2570c16b537SWarner Losh (void)customMem; 2580c16b537SWarner Losh return &g_ctx; 2590c16b537SWarner Losh } 2600c16b537SWarner Losh 2610c16b537SWarner Losh void POOL_free(POOL_ctx* ctx) { 2620c16b537SWarner Losh assert(!ctx || ctx == &g_ctx); 2630c16b537SWarner Losh (void)ctx; 2640c16b537SWarner Losh } 2650c16b537SWarner Losh 266*19fcbaf1SConrad Meyer void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) { 2670c16b537SWarner Losh (void)ctx; 2680c16b537SWarner Losh function(opaque); 2690c16b537SWarner Losh } 2700c16b537SWarner Losh 271*19fcbaf1SConrad Meyer int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) { 272*19fcbaf1SConrad Meyer (void)ctx; 273*19fcbaf1SConrad Meyer function(opaque); 274*19fcbaf1SConrad Meyer return 1; 275*19fcbaf1SConrad Meyer } 276*19fcbaf1SConrad Meyer 2770c16b537SWarner Losh size_t POOL_sizeof(POOL_ctx* ctx) { 2780c16b537SWarner Losh if (ctx==NULL) return 0; /* supports sizeof NULL */ 2790c16b537SWarner Losh assert(ctx == &g_ctx); 2800c16b537SWarner Losh return sizeof(*ctx); 2810c16b537SWarner Losh } 2820c16b537SWarner Losh 2830c16b537SWarner Losh #endif /* ZSTD_MULTITHREAD */ 284