xref: /freebsd/sys/contrib/openzfs/module/zstd/lib/common/pool.c (revision 61145dc2b94f12f6a47344fb9aac702321880e43)
1 // SPDX-License-Identifier: BSD-3-Clause OR GPL-2.0-only
2 /*
3  * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.
4  * All rights reserved.
5  *
6  * This source code is licensed under both the BSD-style license (found in the
7  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
8  * in the COPYING file in the root directory of this source tree).
9  * You may select, at your option, one of the above-listed licenses.
10  */
11 
12 
13 /* ======   Dependencies   ======= */
14 #include <stddef.h>    /* size_t */
15 #include "debug.h"     /* assert */
16 #include "zstd_internal.h"  /* ZSTD_malloc, ZSTD_free */
17 #include "pool.h"
18 
19 /* ======   Compiler specifics   ====== */
20 #if defined(_MSC_VER)
21 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
22 #endif
23 
24 
25 #ifdef ZSTD_MULTITHREAD
26 
27 #include "threading.h"   /* pthread adaptation */
28 
29 /* A job is a function and an opaque argument */
30 typedef struct POOL_job_s {
31     POOL_function function;
32     void *opaque;
33 } POOL_job;
34 
35 struct POOL_ctx_s {
36     ZSTD_customMem customMem;
37     /* Keep track of the threads */
38     ZSTD_pthread_t* threads;
39     size_t threadCapacity;
40     size_t threadLimit;
41 
42     /* The queue is a circular buffer */
43     POOL_job *queue;
44     size_t queueHead;
45     size_t queueTail;
46     size_t queueSize;
47 
48     /* The number of threads working on jobs */
49     size_t numThreadsBusy;
50     /* Indicates if the queue is empty */
51     int queueEmpty;
52 
53     /* The mutex protects the queue */
54     ZSTD_pthread_mutex_t queueMutex;
55     /* Condition variable for pushers to wait on when the queue is full */
56     ZSTD_pthread_cond_t queuePushCond;
57     /* Condition variables for poppers to wait on when the queue is empty */
58     ZSTD_pthread_cond_t queuePopCond;
59     /* Indicates if the queue is shutting down */
60     int shutdown;
61 };
62 
63 /* POOL_thread() :
64  * Work thread for the thread pool.
65  * Waits for jobs and executes them.
66  * @returns : NULL on failure else non-null.
67  */
POOL_thread(void * opaque)68 static void* POOL_thread(void* opaque) {
69     POOL_ctx* const ctx = (POOL_ctx*)opaque;
70     if (!ctx) { return NULL; }
71     for (;;) {
72         /* Lock the mutex and wait for a non-empty queue or until shutdown */
73         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
74 
75         while ( ctx->queueEmpty
76             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
77             if (ctx->shutdown) {
78                 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
79                  * a few threads will be shutdown while !queueEmpty,
80                  * but enough threads will remain active to finish the queue */
81                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
82                 return opaque;
83             }
84             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
85         }
86         /* Pop a job off the queue */
87         {   POOL_job const job = ctx->queue[ctx->queueHead];
88             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
89             ctx->numThreadsBusy++;
90             ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
91             /* Unlock the mutex, signal a pusher, and run the job */
92             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
93             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
94 
95             job.function(job.opaque);
96 
97             /* If the intended queue size was 0, signal after finishing job */
98             ZSTD_pthread_mutex_lock(&ctx->queueMutex);
99             ctx->numThreadsBusy--;
100             if (ctx->queueSize == 1) {
101                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
102             }
103             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
104         }
105     }  /* for (;;) */
106     assert(0);  /* Unreachable */
107 }
108 
POOL_create(size_t numThreads,size_t queueSize)109 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
110     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
111 }
112 
POOL_create_advanced(size_t numThreads,size_t queueSize,ZSTD_customMem customMem)113 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
114                                ZSTD_customMem customMem) {
115     POOL_ctx* ctx;
116     /* Check parameters */
117     if (!numThreads) { return NULL; }
118     /* Allocate the context and zero initialize */
119     ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
120     if (!ctx) { return NULL; }
121     /* Initialize the job queue.
122      * It needs one extra space since one space is wasted to differentiate
123      * empty and full queues.
124      */
125     ctx->queueSize = queueSize + 1;
126     ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
127     ctx->queueHead = 0;
128     ctx->queueTail = 0;
129     ctx->numThreadsBusy = 0;
130     ctx->queueEmpty = 1;
131     {
132         int error = 0;
133         error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
134         error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
135         error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
136         if (error) { POOL_free(ctx); return NULL; }
137     }
138     ctx->shutdown = 0;
139     /* Allocate space for the thread handles */
140     ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
141     ctx->threadCapacity = 0;
142     ctx->customMem = customMem;
143     /* Check for errors */
144     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
145     /* Initialize the threads */
146     {   size_t i;
147         for (i = 0; i < numThreads; ++i) {
148             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
149                 ctx->threadCapacity = i;
150                 POOL_free(ctx);
151                 return NULL;
152         }   }
153         ctx->threadCapacity = numThreads;
154         ctx->threadLimit = numThreads;
155     }
156     return ctx;
157 }
158 
159 /*! POOL_join() :
160     Shutdown the queue, wake any sleeping threads, and join all of the threads.
161 */
POOL_join(POOL_ctx * ctx)162 static void POOL_join(POOL_ctx* ctx) {
163     /* Shut down the queue */
164     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
165     ctx->shutdown = 1;
166     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
167     /* Wake up sleeping threads */
168     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
169     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
170     /* Join all of the threads */
171     {   size_t i;
172         for (i = 0; i < ctx->threadCapacity; ++i) {
173             ZSTD_pthread_join(ctx->threads[i], NULL);  /* note : could fail */
174     }   }
175 }
176 
POOL_free(POOL_ctx * ctx)177 void POOL_free(POOL_ctx *ctx) {
178     if (!ctx) { return; }
179     POOL_join(ctx);
180     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
181     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
182     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
183     ZSTD_free(ctx->queue, ctx->customMem);
184     ZSTD_free(ctx->threads, ctx->customMem);
185     ZSTD_free(ctx, ctx->customMem);
186 }
187 
188 
189 
POOL_sizeof(POOL_ctx * ctx)190 size_t POOL_sizeof(POOL_ctx *ctx) {
191     if (ctx==NULL) return 0;  /* supports sizeof NULL */
192     return sizeof(*ctx)
193         + ctx->queueSize * sizeof(POOL_job)
194         + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
195 }
196 
197 
198 /* @return : 0 on success, 1 on error */
POOL_resize_internal(POOL_ctx * ctx,size_t numThreads)199 static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
200 {
201     if (numThreads <= ctx->threadCapacity) {
202         if (!numThreads) return 1;
203         ctx->threadLimit = numThreads;
204         return 0;
205     }
206     /* numThreads > threadCapacity */
207     {   ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
208         if (!threadPool) return 1;
209         /* replace existing thread pool */
210         memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
211         ZSTD_free(ctx->threads, ctx->customMem);
212         ctx->threads = threadPool;
213         /* Initialize additional threads */
214         {   size_t threadId;
215             for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
216                 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
217                     ctx->threadCapacity = threadId;
218                     return 1;
219             }   }
220     }   }
221     /* successfully expanded */
222     ctx->threadCapacity = numThreads;
223     ctx->threadLimit = numThreads;
224     return 0;
225 }
226 
227 /* @return : 0 on success, 1 on error */
POOL_resize(POOL_ctx * ctx,size_t numThreads)228 int POOL_resize(POOL_ctx* ctx, size_t numThreads)
229 {
230     int result;
231     if (ctx==NULL) return 1;
232     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
233     result = POOL_resize_internal(ctx, numThreads);
234     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
235     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
236     return result;
237 }
238 
239 /**
240  * Returns 1 if the queue is full and 0 otherwise.
241  *
242  * When queueSize is 1 (pool was created with an intended queueSize of 0),
243  * then a queue is empty if there is a thread free _and_ no job is waiting.
244  */
isQueueFull(POOL_ctx const * ctx)245 static int isQueueFull(POOL_ctx const* ctx) {
246     if (ctx->queueSize > 1) {
247         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
248     } else {
249         return (ctx->numThreadsBusy == ctx->threadLimit) ||
250                !ctx->queueEmpty;
251     }
252 }
253 
254 
POOL_add_internal(POOL_ctx * ctx,POOL_function function,void * opaque)255 static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
256 {
257     POOL_job const job = {function, opaque};
258     assert(ctx != NULL);
259     if (ctx->shutdown) return;
260 
261     ctx->queueEmpty = 0;
262     ctx->queue[ctx->queueTail] = job;
263     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
264     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
265 }
266 
POOL_add(POOL_ctx * ctx,POOL_function function,void * opaque)267 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
268 {
269     assert(ctx != NULL);
270     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
271     /* Wait until there is space in the queue for the new job */
272     while (isQueueFull(ctx) && (!ctx->shutdown)) {
273         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
274     }
275     POOL_add_internal(ctx, function, opaque);
276     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
277 }
278 
279 
POOL_tryAdd(POOL_ctx * ctx,POOL_function function,void * opaque)280 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
281 {
282     assert(ctx != NULL);
283     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
284     if (isQueueFull(ctx)) {
285         ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
286         return 0;
287     }
288     POOL_add_internal(ctx, function, opaque);
289     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
290     return 1;
291 }
292 
293 
294 #else  /* ZSTD_MULTITHREAD  not defined */
295 
296 /* ========================== */
297 /* No multi-threading support */
298 /* ========================== */
299 
300 
301 /* We don't need any data, but if it is empty, malloc() might return NULL. */
302 struct POOL_ctx_s {
303     int dummy;
304 };
305 static POOL_ctx g_ctx;
306 
POOL_create(size_t numThreads,size_t queueSize)307 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
308     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
309 }
310 
POOL_create_advanced(size_t numThreads,size_t queueSize,ZSTD_customMem customMem)311 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
312     (void)numThreads;
313     (void)queueSize;
314     (void)customMem;
315     return &g_ctx;
316 }
317 
POOL_free(POOL_ctx * ctx)318 void POOL_free(POOL_ctx* ctx) {
319     assert(!ctx || ctx == &g_ctx);
320     (void)ctx;
321 }
322 
POOL_resize(POOL_ctx * ctx,size_t numThreads)323 int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
324     (void)ctx; (void)numThreads;
325     return 0;
326 }
327 
POOL_add(POOL_ctx * ctx,POOL_function function,void * opaque)328 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
329     (void)ctx;
330     function(opaque);
331 }
332 
POOL_tryAdd(POOL_ctx * ctx,POOL_function function,void * opaque)333 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
334     (void)ctx;
335     function(opaque);
336     return 1;
337 }
338 
POOL_sizeof(POOL_ctx * ctx)339 size_t POOL_sizeof(POOL_ctx* ctx) {
340     if (ctx==NULL) return 0;  /* supports sizeof NULL */
341     assert(ctx == &g_ctx);
342     return sizeof(*ctx);
343 }
344 
345 #endif  /* ZSTD_MULTITHREAD */
346