xref: /freebsd/sys/contrib/zstd/lib/common/pool.c (revision 19fcbaf1424b464269f1a7621fab747bb75afc36)
10c16b537SWarner Losh /*
20c16b537SWarner Losh  * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
30c16b537SWarner Losh  * All rights reserved.
40c16b537SWarner Losh  *
50c16b537SWarner Losh  * This source code is licensed under both the BSD-style license (found in the
60c16b537SWarner Losh  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
70c16b537SWarner Losh  * in the COPYING file in the root directory of this source tree).
80c16b537SWarner Losh  * You may select, at your option, one of the above-listed licenses.
90c16b537SWarner Losh  */
100c16b537SWarner Losh 
110c16b537SWarner Losh 
120c16b537SWarner Losh /* ======   Dependencies   ======= */
130c16b537SWarner Losh #include <stddef.h>  /* size_t */
140c16b537SWarner Losh #include "pool.h"
15*19fcbaf1SConrad Meyer #include "zstd_internal.h"  /* ZSTD_malloc, ZSTD_free */
160c16b537SWarner Losh 
170c16b537SWarner Losh /* ======   Compiler specifics   ====== */
180c16b537SWarner Losh #if defined(_MSC_VER)
190c16b537SWarner Losh #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
200c16b537SWarner Losh #endif
210c16b537SWarner Losh 
220c16b537SWarner Losh 
230c16b537SWarner Losh #ifdef ZSTD_MULTITHREAD
240c16b537SWarner Losh 
250c16b537SWarner Losh #include "threading.h"   /* pthread adaptation */
260c16b537SWarner Losh 
270c16b537SWarner Losh /* A job is a function and an opaque argument */
280c16b537SWarner Losh typedef struct POOL_job_s {
290c16b537SWarner Losh     POOL_function function;
300c16b537SWarner Losh     void *opaque;
310c16b537SWarner Losh } POOL_job;
320c16b537SWarner Losh 
330c16b537SWarner Losh struct POOL_ctx_s {
340c16b537SWarner Losh     ZSTD_customMem customMem;
350c16b537SWarner Losh     /* Keep track of the threads */
360c16b537SWarner Losh     ZSTD_pthread_t *threads;
370c16b537SWarner Losh     size_t numThreads;
380c16b537SWarner Losh 
390c16b537SWarner Losh     /* The queue is a circular buffer */
400c16b537SWarner Losh     POOL_job *queue;
410c16b537SWarner Losh     size_t queueHead;
420c16b537SWarner Losh     size_t queueTail;
430c16b537SWarner Losh     size_t queueSize;
440c16b537SWarner Losh 
450c16b537SWarner Losh     /* The number of threads working on jobs */
460c16b537SWarner Losh     size_t numThreadsBusy;
470c16b537SWarner Losh     /* Indicates if the queue is empty */
480c16b537SWarner Losh     int queueEmpty;
490c16b537SWarner Losh 
500c16b537SWarner Losh     /* The mutex protects the queue */
510c16b537SWarner Losh     ZSTD_pthread_mutex_t queueMutex;
520c16b537SWarner Losh     /* Condition variable for pushers to wait on when the queue is full */
530c16b537SWarner Losh     ZSTD_pthread_cond_t queuePushCond;
540c16b537SWarner Losh     /* Condition variables for poppers to wait on when the queue is empty */
550c16b537SWarner Losh     ZSTD_pthread_cond_t queuePopCond;
560c16b537SWarner Losh     /* Indicates if the queue is shutting down */
570c16b537SWarner Losh     int shutdown;
580c16b537SWarner Losh };
590c16b537SWarner Losh 
600c16b537SWarner Losh /* POOL_thread() :
610c16b537SWarner Losh    Work thread for the thread pool.
620c16b537SWarner Losh    Waits for jobs and executes them.
630c16b537SWarner Losh    @returns : NULL on failure else non-null.
640c16b537SWarner Losh */
650c16b537SWarner Losh static void* POOL_thread(void* opaque) {
660c16b537SWarner Losh     POOL_ctx* const ctx = (POOL_ctx*)opaque;
670c16b537SWarner Losh     if (!ctx) { return NULL; }
680c16b537SWarner Losh     for (;;) {
690c16b537SWarner Losh         /* Lock the mutex and wait for a non-empty queue or until shutdown */
700c16b537SWarner Losh         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
710c16b537SWarner Losh 
720c16b537SWarner Losh         while (ctx->queueEmpty && !ctx->shutdown) {
730c16b537SWarner Losh             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
740c16b537SWarner Losh         }
750c16b537SWarner Losh         /* empty => shutting down: so stop */
760c16b537SWarner Losh         if (ctx->queueEmpty) {
770c16b537SWarner Losh             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
780c16b537SWarner Losh             return opaque;
790c16b537SWarner Losh         }
800c16b537SWarner Losh         /* Pop a job off the queue */
810c16b537SWarner Losh         {   POOL_job const job = ctx->queue[ctx->queueHead];
820c16b537SWarner Losh             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
830c16b537SWarner Losh             ctx->numThreadsBusy++;
840c16b537SWarner Losh             ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
850c16b537SWarner Losh             /* Unlock the mutex, signal a pusher, and run the job */
860c16b537SWarner Losh             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
870c16b537SWarner Losh             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
880c16b537SWarner Losh 
890c16b537SWarner Losh             job.function(job.opaque);
900c16b537SWarner Losh 
910c16b537SWarner Losh             /* If the intended queue size was 0, signal after finishing job */
920c16b537SWarner Losh             if (ctx->queueSize == 1) {
930c16b537SWarner Losh                 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
940c16b537SWarner Losh                 ctx->numThreadsBusy--;
950c16b537SWarner Losh                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
960c16b537SWarner Losh                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
970c16b537SWarner Losh         }   }
980c16b537SWarner Losh     }  /* for (;;) */
990c16b537SWarner Losh     /* Unreachable */
1000c16b537SWarner Losh }
1010c16b537SWarner Losh 
1020c16b537SWarner Losh POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
1030c16b537SWarner Losh     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
1040c16b537SWarner Losh }
1050c16b537SWarner Losh 
1060c16b537SWarner Losh POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
1070c16b537SWarner Losh     POOL_ctx* ctx;
1080c16b537SWarner Losh     /* Check the parameters */
1090c16b537SWarner Losh     if (!numThreads) { return NULL; }
1100c16b537SWarner Losh     /* Allocate the context and zero initialize */
1110c16b537SWarner Losh     ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
1120c16b537SWarner Losh     if (!ctx) { return NULL; }
1130c16b537SWarner Losh     /* Initialize the job queue.
1140c16b537SWarner Losh      * It needs one extra space since one space is wasted to differentiate empty
1150c16b537SWarner Losh      * and full queues.
1160c16b537SWarner Losh      */
1170c16b537SWarner Losh     ctx->queueSize = queueSize + 1;
118052d3c12SConrad Meyer     ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
1190c16b537SWarner Losh     ctx->queueHead = 0;
1200c16b537SWarner Losh     ctx->queueTail = 0;
1210c16b537SWarner Losh     ctx->numThreadsBusy = 0;
1220c16b537SWarner Losh     ctx->queueEmpty = 1;
1230c16b537SWarner Losh     (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
1240c16b537SWarner Losh     (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
1250c16b537SWarner Losh     (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
1260c16b537SWarner Losh     ctx->shutdown = 0;
1270c16b537SWarner Losh     /* Allocate space for the thread handles */
1280c16b537SWarner Losh     ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
1290c16b537SWarner Losh     ctx->numThreads = 0;
1300c16b537SWarner Losh     ctx->customMem = customMem;
1310c16b537SWarner Losh     /* Check for errors */
1320c16b537SWarner Losh     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
1330c16b537SWarner Losh     /* Initialize the threads */
1340c16b537SWarner Losh     {   size_t i;
1350c16b537SWarner Losh         for (i = 0; i < numThreads; ++i) {
1360c16b537SWarner Losh             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
1370c16b537SWarner Losh                 ctx->numThreads = i;
1380c16b537SWarner Losh                 POOL_free(ctx);
1390c16b537SWarner Losh                 return NULL;
1400c16b537SWarner Losh         }   }
1410c16b537SWarner Losh         ctx->numThreads = numThreads;
1420c16b537SWarner Losh     }
1430c16b537SWarner Losh     return ctx;
1440c16b537SWarner Losh }
1450c16b537SWarner Losh 
1460c16b537SWarner Losh /*! POOL_join() :
1470c16b537SWarner Losh     Shutdown the queue, wake any sleeping threads, and join all of the threads.
1480c16b537SWarner Losh */
1490c16b537SWarner Losh static void POOL_join(POOL_ctx* ctx) {
1500c16b537SWarner Losh     /* Shut down the queue */
1510c16b537SWarner Losh     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
1520c16b537SWarner Losh     ctx->shutdown = 1;
1530c16b537SWarner Losh     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
1540c16b537SWarner Losh     /* Wake up sleeping threads */
1550c16b537SWarner Losh     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
1560c16b537SWarner Losh     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
1570c16b537SWarner Losh     /* Join all of the threads */
1580c16b537SWarner Losh     {   size_t i;
1590c16b537SWarner Losh         for (i = 0; i < ctx->numThreads; ++i) {
1600c16b537SWarner Losh             ZSTD_pthread_join(ctx->threads[i], NULL);
1610c16b537SWarner Losh     }   }
1620c16b537SWarner Losh }
1630c16b537SWarner Losh 
1640c16b537SWarner Losh void POOL_free(POOL_ctx *ctx) {
1650c16b537SWarner Losh     if (!ctx) { return; }
1660c16b537SWarner Losh     POOL_join(ctx);
1670c16b537SWarner Losh     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
1680c16b537SWarner Losh     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
1690c16b537SWarner Losh     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
1700c16b537SWarner Losh     ZSTD_free(ctx->queue, ctx->customMem);
1710c16b537SWarner Losh     ZSTD_free(ctx->threads, ctx->customMem);
1720c16b537SWarner Losh     ZSTD_free(ctx, ctx->customMem);
1730c16b537SWarner Losh }
1740c16b537SWarner Losh 
1750c16b537SWarner Losh size_t POOL_sizeof(POOL_ctx *ctx) {
1760c16b537SWarner Losh     if (ctx==NULL) return 0;  /* supports sizeof NULL */
1770c16b537SWarner Losh     return sizeof(*ctx)
1780c16b537SWarner Losh         + ctx->queueSize * sizeof(POOL_job)
1790c16b537SWarner Losh         + ctx->numThreads * sizeof(ZSTD_pthread_t);
1800c16b537SWarner Losh }
1810c16b537SWarner Losh 
1820c16b537SWarner Losh /**
1830c16b537SWarner Losh  * Returns 1 if the queue is full and 0 otherwise.
1840c16b537SWarner Losh  *
1850c16b537SWarner Losh  * If the queueSize is 1 (the pool was created with an intended queueSize of 0),
1860c16b537SWarner Losh  * then a queue is empty if there is a thread free and no job is waiting.
1870c16b537SWarner Losh  */
1880c16b537SWarner Losh static int isQueueFull(POOL_ctx const* ctx) {
1890c16b537SWarner Losh     if (ctx->queueSize > 1) {
1900c16b537SWarner Losh         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
1910c16b537SWarner Losh     } else {
1920c16b537SWarner Losh         return ctx->numThreadsBusy == ctx->numThreads ||
1930c16b537SWarner Losh                !ctx->queueEmpty;
1940c16b537SWarner Losh     }
1950c16b537SWarner Losh }
1960c16b537SWarner Losh 
1970c16b537SWarner Losh 
198*19fcbaf1SConrad Meyer static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
199*19fcbaf1SConrad Meyer {
200*19fcbaf1SConrad Meyer     POOL_job const job = {function, opaque};
201*19fcbaf1SConrad Meyer     assert(ctx != NULL);
202*19fcbaf1SConrad Meyer     if (ctx->shutdown) return;
2030c16b537SWarner Losh 
2040c16b537SWarner Losh     ctx->queueEmpty = 0;
2050c16b537SWarner Losh     ctx->queue[ctx->queueTail] = job;
2060c16b537SWarner Losh     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
2070c16b537SWarner Losh     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
2080c16b537SWarner Losh }
2090c16b537SWarner Losh 
210*19fcbaf1SConrad Meyer void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
211*19fcbaf1SConrad Meyer {
212*19fcbaf1SConrad Meyer     assert(ctx != NULL);
213*19fcbaf1SConrad Meyer     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
214*19fcbaf1SConrad Meyer     /* Wait until there is space in the queue for the new job */
215*19fcbaf1SConrad Meyer     while (isQueueFull(ctx) && (!ctx->shutdown)) {
216*19fcbaf1SConrad Meyer         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
217*19fcbaf1SConrad Meyer     }
218*19fcbaf1SConrad Meyer     POOL_add_internal(ctx, function, opaque);
219*19fcbaf1SConrad Meyer     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
220*19fcbaf1SConrad Meyer }
2210c16b537SWarner Losh 
222*19fcbaf1SConrad Meyer 
223*19fcbaf1SConrad Meyer int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
224*19fcbaf1SConrad Meyer {
225*19fcbaf1SConrad Meyer     assert(ctx != NULL);
226*19fcbaf1SConrad Meyer     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
227*19fcbaf1SConrad Meyer     if (isQueueFull(ctx)) {
228*19fcbaf1SConrad Meyer         ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
229*19fcbaf1SConrad Meyer         return 0;
230*19fcbaf1SConrad Meyer     }
231*19fcbaf1SConrad Meyer     POOL_add_internal(ctx, function, opaque);
232*19fcbaf1SConrad Meyer     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
233*19fcbaf1SConrad Meyer     return 1;
234*19fcbaf1SConrad Meyer }
235*19fcbaf1SConrad Meyer 
236*19fcbaf1SConrad Meyer 
237*19fcbaf1SConrad Meyer #else  /* ZSTD_MULTITHREAD  not defined */
238*19fcbaf1SConrad Meyer 
239*19fcbaf1SConrad Meyer /* ========================== */
240*19fcbaf1SConrad Meyer /* No multi-threading support */
241*19fcbaf1SConrad Meyer /* ========================== */
242*19fcbaf1SConrad Meyer 
243*19fcbaf1SConrad Meyer 
244*19fcbaf1SConrad Meyer /* We don't need any data, but if it is empty, malloc() might return NULL. */
2450c16b537SWarner Losh struct POOL_ctx_s {
2460c16b537SWarner Losh     int dummy;
2470c16b537SWarner Losh };
2480c16b537SWarner Losh static POOL_ctx g_ctx;
2490c16b537SWarner Losh 
2500c16b537SWarner Losh POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
2510c16b537SWarner Losh     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
2520c16b537SWarner Losh }
2530c16b537SWarner Losh 
2540c16b537SWarner Losh POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
2550c16b537SWarner Losh     (void)numThreads;
2560c16b537SWarner Losh     (void)queueSize;
2570c16b537SWarner Losh     (void)customMem;
2580c16b537SWarner Losh     return &g_ctx;
2590c16b537SWarner Losh }
2600c16b537SWarner Losh 
2610c16b537SWarner Losh void POOL_free(POOL_ctx* ctx) {
2620c16b537SWarner Losh     assert(!ctx || ctx == &g_ctx);
2630c16b537SWarner Losh     (void)ctx;
2640c16b537SWarner Losh }
2650c16b537SWarner Losh 
266*19fcbaf1SConrad Meyer void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
2670c16b537SWarner Losh     (void)ctx;
2680c16b537SWarner Losh     function(opaque);
2690c16b537SWarner Losh }
2700c16b537SWarner Losh 
271*19fcbaf1SConrad Meyer int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
272*19fcbaf1SConrad Meyer     (void)ctx;
273*19fcbaf1SConrad Meyer     function(opaque);
274*19fcbaf1SConrad Meyer     return 1;
275*19fcbaf1SConrad Meyer }
276*19fcbaf1SConrad Meyer 
2770c16b537SWarner Losh size_t POOL_sizeof(POOL_ctx* ctx) {
2780c16b537SWarner Losh     if (ctx==NULL) return 0;  /* supports sizeof NULL */
2790c16b537SWarner Losh     assert(ctx == &g_ctx);
2800c16b537SWarner Losh     return sizeof(*ctx);
2810c16b537SWarner Losh }
2820c16b537SWarner Losh 
2830c16b537SWarner Losh #endif  /* ZSTD_MULTITHREAD */
284