1 // SPDX-License-Identifier: BSD-3-Clause OR GPL-2.0-only
2 /*
3 * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.
4 * All rights reserved.
5 *
6 * This source code is licensed under both the BSD-style license (found in the
7 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
8 * in the COPYING file in the root directory of this source tree).
9 * You may select, at your option, one of the above-listed licenses.
10 */
11
12
13 /* ====== Dependencies ======= */
14 #include <stddef.h> /* size_t */
15 #include "debug.h" /* assert */
16 #include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
17 #include "pool.h"
18
19 /* ====== Compiler specifics ====== */
20 #if defined(_MSC_VER)
21 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
22 #endif
23
24
25 #ifdef ZSTD_MULTITHREAD
26
27 #include "threading.h" /* pthread adaptation */
28
29 /* A job is a function and an opaque argument */
30 typedef struct POOL_job_s {
31 POOL_function function;
32 void *opaque;
33 } POOL_job;
34
35 struct POOL_ctx_s {
36 ZSTD_customMem customMem;
37 /* Keep track of the threads */
38 ZSTD_pthread_t* threads;
39 size_t threadCapacity;
40 size_t threadLimit;
41
42 /* The queue is a circular buffer */
43 POOL_job *queue;
44 size_t queueHead;
45 size_t queueTail;
46 size_t queueSize;
47
48 /* The number of threads working on jobs */
49 size_t numThreadsBusy;
50 /* Indicates if the queue is empty */
51 int queueEmpty;
52
53 /* The mutex protects the queue */
54 ZSTD_pthread_mutex_t queueMutex;
55 /* Condition variable for pushers to wait on when the queue is full */
56 ZSTD_pthread_cond_t queuePushCond;
57 /* Condition variables for poppers to wait on when the queue is empty */
58 ZSTD_pthread_cond_t queuePopCond;
59 /* Indicates if the queue is shutting down */
60 int shutdown;
61 };
62
63 /* POOL_thread() :
64 * Work thread for the thread pool.
65 * Waits for jobs and executes them.
66 * @returns : NULL on failure else non-null.
67 */
POOL_thread(void * opaque)68 static void* POOL_thread(void* opaque) {
69 POOL_ctx* const ctx = (POOL_ctx*)opaque;
70 if (!ctx) { return NULL; }
71 for (;;) {
72 /* Lock the mutex and wait for a non-empty queue or until shutdown */
73 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
74
75 while ( ctx->queueEmpty
76 || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
77 if (ctx->shutdown) {
78 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
79 * a few threads will be shutdown while !queueEmpty,
80 * but enough threads will remain active to finish the queue */
81 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
82 return opaque;
83 }
84 ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
85 }
86 /* Pop a job off the queue */
87 { POOL_job const job = ctx->queue[ctx->queueHead];
88 ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
89 ctx->numThreadsBusy++;
90 ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
91 /* Unlock the mutex, signal a pusher, and run the job */
92 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
93 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
94
95 job.function(job.opaque);
96
97 /* If the intended queue size was 0, signal after finishing job */
98 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
99 ctx->numThreadsBusy--;
100 if (ctx->queueSize == 1) {
101 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
102 }
103 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
104 }
105 } /* for (;;) */
106 assert(0); /* Unreachable */
107 }
108
POOL_create(size_t numThreads,size_t queueSize)109 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
110 return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
111 }
112
POOL_create_advanced(size_t numThreads,size_t queueSize,ZSTD_customMem customMem)113 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
114 ZSTD_customMem customMem) {
115 POOL_ctx* ctx;
116 /* Check parameters */
117 if (!numThreads) { return NULL; }
118 /* Allocate the context and zero initialize */
119 ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
120 if (!ctx) { return NULL; }
121 /* Initialize the job queue.
122 * It needs one extra space since one space is wasted to differentiate
123 * empty and full queues.
124 */
125 ctx->queueSize = queueSize + 1;
126 ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
127 ctx->queueHead = 0;
128 ctx->queueTail = 0;
129 ctx->numThreadsBusy = 0;
130 ctx->queueEmpty = 1;
131 {
132 int error = 0;
133 error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
134 error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
135 error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
136 if (error) { POOL_free(ctx); return NULL; }
137 }
138 ctx->shutdown = 0;
139 /* Allocate space for the thread handles */
140 ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
141 ctx->threadCapacity = 0;
142 ctx->customMem = customMem;
143 /* Check for errors */
144 if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
145 /* Initialize the threads */
146 { size_t i;
147 for (i = 0; i < numThreads; ++i) {
148 if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
149 ctx->threadCapacity = i;
150 POOL_free(ctx);
151 return NULL;
152 } }
153 ctx->threadCapacity = numThreads;
154 ctx->threadLimit = numThreads;
155 }
156 return ctx;
157 }
158
159 /*! POOL_join() :
160 Shutdown the queue, wake any sleeping threads, and join all of the threads.
161 */
POOL_join(POOL_ctx * ctx)162 static void POOL_join(POOL_ctx* ctx) {
163 /* Shut down the queue */
164 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
165 ctx->shutdown = 1;
166 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
167 /* Wake up sleeping threads */
168 ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
169 ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
170 /* Join all of the threads */
171 { size_t i;
172 for (i = 0; i < ctx->threadCapacity; ++i) {
173 ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */
174 } }
175 }
176
POOL_free(POOL_ctx * ctx)177 void POOL_free(POOL_ctx *ctx) {
178 if (!ctx) { return; }
179 POOL_join(ctx);
180 ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
181 ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
182 ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
183 ZSTD_free(ctx->queue, ctx->customMem);
184 ZSTD_free(ctx->threads, ctx->customMem);
185 ZSTD_free(ctx, ctx->customMem);
186 }
187
188
189
POOL_sizeof(POOL_ctx * ctx)190 size_t POOL_sizeof(POOL_ctx *ctx) {
191 if (ctx==NULL) return 0; /* supports sizeof NULL */
192 return sizeof(*ctx)
193 + ctx->queueSize * sizeof(POOL_job)
194 + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
195 }
196
197
198 /* @return : 0 on success, 1 on error */
POOL_resize_internal(POOL_ctx * ctx,size_t numThreads)199 static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
200 {
201 if (numThreads <= ctx->threadCapacity) {
202 if (!numThreads) return 1;
203 ctx->threadLimit = numThreads;
204 return 0;
205 }
206 /* numThreads > threadCapacity */
207 { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
208 if (!threadPool) return 1;
209 /* replace existing thread pool */
210 memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
211 ZSTD_free(ctx->threads, ctx->customMem);
212 ctx->threads = threadPool;
213 /* Initialize additional threads */
214 { size_t threadId;
215 for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
216 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
217 ctx->threadCapacity = threadId;
218 return 1;
219 } }
220 } }
221 /* successfully expanded */
222 ctx->threadCapacity = numThreads;
223 ctx->threadLimit = numThreads;
224 return 0;
225 }
226
227 /* @return : 0 on success, 1 on error */
POOL_resize(POOL_ctx * ctx,size_t numThreads)228 int POOL_resize(POOL_ctx* ctx, size_t numThreads)
229 {
230 int result;
231 if (ctx==NULL) return 1;
232 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
233 result = POOL_resize_internal(ctx, numThreads);
234 ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
235 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
236 return result;
237 }
238
239 /**
240 * Returns 1 if the queue is full and 0 otherwise.
241 *
242 * When queueSize is 1 (pool was created with an intended queueSize of 0),
243 * then a queue is empty if there is a thread free _and_ no job is waiting.
244 */
isQueueFull(POOL_ctx const * ctx)245 static int isQueueFull(POOL_ctx const* ctx) {
246 if (ctx->queueSize > 1) {
247 return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
248 } else {
249 return (ctx->numThreadsBusy == ctx->threadLimit) ||
250 !ctx->queueEmpty;
251 }
252 }
253
254
POOL_add_internal(POOL_ctx * ctx,POOL_function function,void * opaque)255 static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
256 {
257 POOL_job const job = {function, opaque};
258 assert(ctx != NULL);
259 if (ctx->shutdown) return;
260
261 ctx->queueEmpty = 0;
262 ctx->queue[ctx->queueTail] = job;
263 ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
264 ZSTD_pthread_cond_signal(&ctx->queuePopCond);
265 }
266
POOL_add(POOL_ctx * ctx,POOL_function function,void * opaque)267 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
268 {
269 assert(ctx != NULL);
270 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
271 /* Wait until there is space in the queue for the new job */
272 while (isQueueFull(ctx) && (!ctx->shutdown)) {
273 ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
274 }
275 POOL_add_internal(ctx, function, opaque);
276 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
277 }
278
279
POOL_tryAdd(POOL_ctx * ctx,POOL_function function,void * opaque)280 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
281 {
282 assert(ctx != NULL);
283 ZSTD_pthread_mutex_lock(&ctx->queueMutex);
284 if (isQueueFull(ctx)) {
285 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
286 return 0;
287 }
288 POOL_add_internal(ctx, function, opaque);
289 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
290 return 1;
291 }
292
293
294 #else /* ZSTD_MULTITHREAD not defined */
295
296 /* ========================== */
297 /* No multi-threading support */
298 /* ========================== */
299
300
301 /* We don't need any data, but if it is empty, malloc() might return NULL. */
302 struct POOL_ctx_s {
303 int dummy;
304 };
305 static POOL_ctx g_ctx;
306
POOL_create(size_t numThreads,size_t queueSize)307 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
308 return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
309 }
310
POOL_create_advanced(size_t numThreads,size_t queueSize,ZSTD_customMem customMem)311 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
312 (void)numThreads;
313 (void)queueSize;
314 (void)customMem;
315 return &g_ctx;
316 }
317
POOL_free(POOL_ctx * ctx)318 void POOL_free(POOL_ctx* ctx) {
319 assert(!ctx || ctx == &g_ctx);
320 (void)ctx;
321 }
322
POOL_resize(POOL_ctx * ctx,size_t numThreads)323 int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
324 (void)ctx; (void)numThreads;
325 return 0;
326 }
327
POOL_add(POOL_ctx * ctx,POOL_function function,void * opaque)328 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
329 (void)ctx;
330 function(opaque);
331 }
332
POOL_tryAdd(POOL_ctx * ctx,POOL_function function,void * opaque)333 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
334 (void)ctx;
335 function(opaque);
336 return 1;
337 }
338
POOL_sizeof(POOL_ctx * ctx)339 size_t POOL_sizeof(POOL_ctx* ctx) {
340 if (ctx==NULL) return 0; /* supports sizeof NULL */
341 assert(ctx == &g_ctx);
342 return sizeof(*ctx);
343 }
344
345 #endif /* ZSTD_MULTITHREAD */
346