1 // SPDX-License-Identifier: BSD-3-Clause OR GPL-2.0-only 2 /* 3 * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc. 4 * All rights reserved. 5 * 6 * This source code is licensed under both the BSD-style license (found in the 7 * LICENSE file in the root directory of this source tree) and the GPLv2 (found 8 * in the COPYING file in the root directory of this source tree). 9 * You may select, at your option, one of the above-listed licenses. 10 */ 11 12 13 /* ====== Dependencies ======= */ 14 #include <stddef.h> /* size_t */ 15 #include "debug.h" /* assert */ 16 #include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */ 17 #include "pool.h" 18 19 /* ====== Compiler specifics ====== */ 20 #if defined(_MSC_VER) 21 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ 22 #endif 23 24 25 #ifdef ZSTD_MULTITHREAD 26 27 #include "threading.h" /* pthread adaptation */ 28 29 /* A job is a function and an opaque argument */ 30 typedef struct POOL_job_s { 31 POOL_function function; 32 void *opaque; 33 } POOL_job; 34 35 struct POOL_ctx_s { 36 ZSTD_customMem customMem; 37 /* Keep track of the threads */ 38 ZSTD_pthread_t* threads; 39 size_t threadCapacity; 40 size_t threadLimit; 41 42 /* The queue is a circular buffer */ 43 POOL_job *queue; 44 size_t queueHead; 45 size_t queueTail; 46 size_t queueSize; 47 48 /* The number of threads working on jobs */ 49 size_t numThreadsBusy; 50 /* Indicates if the queue is empty */ 51 int queueEmpty; 52 53 /* The mutex protects the queue */ 54 ZSTD_pthread_mutex_t queueMutex; 55 /* Condition variable for pushers to wait on when the queue is full */ 56 ZSTD_pthread_cond_t queuePushCond; 57 /* Condition variables for poppers to wait on when the queue is empty */ 58 ZSTD_pthread_cond_t queuePopCond; 59 /* Indicates if the queue is shutting down */ 60 int shutdown; 61 }; 62 63 /* POOL_thread() : 64 * Work thread for the thread pool. 65 * Waits for jobs and executes them. 66 * @returns : NULL on failure else non-null. 67 */ 68 static void* POOL_thread(void* opaque) { 69 POOL_ctx* const ctx = (POOL_ctx*)opaque; 70 if (!ctx) { return NULL; } 71 for (;;) { 72 /* Lock the mutex and wait for a non-empty queue or until shutdown */ 73 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 74 75 while ( ctx->queueEmpty 76 || (ctx->numThreadsBusy >= ctx->threadLimit) ) { 77 if (ctx->shutdown) { 78 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit), 79 * a few threads will be shutdown while !queueEmpty, 80 * but enough threads will remain active to finish the queue */ 81 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 82 return opaque; 83 } 84 ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); 85 } 86 /* Pop a job off the queue */ 87 { POOL_job const job = ctx->queue[ctx->queueHead]; 88 ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; 89 ctx->numThreadsBusy++; 90 ctx->queueEmpty = ctx->queueHead == ctx->queueTail; 91 /* Unlock the mutex, signal a pusher, and run the job */ 92 ZSTD_pthread_cond_signal(&ctx->queuePushCond); 93 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 94 95 job.function(job.opaque); 96 97 /* If the intended queue size was 0, signal after finishing job */ 98 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 99 ctx->numThreadsBusy--; 100 if (ctx->queueSize == 1) { 101 ZSTD_pthread_cond_signal(&ctx->queuePushCond); 102 } 103 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 104 } 105 } /* for (;;) */ 106 assert(0); /* Unreachable */ 107 } 108 109 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { 110 return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); 111 } 112 113 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, 114 ZSTD_customMem customMem) { 115 POOL_ctx* ctx; 116 /* Check parameters */ 117 if (!numThreads) { return NULL; } 118 /* Allocate the context and zero initialize */ 119 ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem); 120 if (!ctx) { return NULL; } 121 /* Initialize the job queue. 122 * It needs one extra space since one space is wasted to differentiate 123 * empty and full queues. 124 */ 125 ctx->queueSize = queueSize + 1; 126 ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem); 127 ctx->queueHead = 0; 128 ctx->queueTail = 0; 129 ctx->numThreadsBusy = 0; 130 ctx->queueEmpty = 1; 131 { 132 int error = 0; 133 error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL); 134 error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL); 135 error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); 136 if (error) { POOL_free(ctx); return NULL; } 137 } 138 ctx->shutdown = 0; 139 /* Allocate space for the thread handles */ 140 ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem); 141 ctx->threadCapacity = 0; 142 ctx->customMem = customMem; 143 /* Check for errors */ 144 if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } 145 /* Initialize the threads */ 146 { size_t i; 147 for (i = 0; i < numThreads; ++i) { 148 if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { 149 ctx->threadCapacity = i; 150 POOL_free(ctx); 151 return NULL; 152 } } 153 ctx->threadCapacity = numThreads; 154 ctx->threadLimit = numThreads; 155 } 156 return ctx; 157 } 158 159 /*! POOL_join() : 160 Shutdown the queue, wake any sleeping threads, and join all of the threads. 161 */ 162 static void POOL_join(POOL_ctx* ctx) { 163 /* Shut down the queue */ 164 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 165 ctx->shutdown = 1; 166 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 167 /* Wake up sleeping threads */ 168 ZSTD_pthread_cond_broadcast(&ctx->queuePushCond); 169 ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); 170 /* Join all of the threads */ 171 { size_t i; 172 for (i = 0; i < ctx->threadCapacity; ++i) { 173 ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */ 174 } } 175 } 176 177 void POOL_free(POOL_ctx *ctx) { 178 if (!ctx) { return; } 179 POOL_join(ctx); 180 ZSTD_pthread_mutex_destroy(&ctx->queueMutex); 181 ZSTD_pthread_cond_destroy(&ctx->queuePushCond); 182 ZSTD_pthread_cond_destroy(&ctx->queuePopCond); 183 ZSTD_free(ctx->queue, ctx->customMem); 184 ZSTD_free(ctx->threads, ctx->customMem); 185 ZSTD_free(ctx, ctx->customMem); 186 } 187 188 189 190 size_t POOL_sizeof(POOL_ctx *ctx) { 191 if (ctx==NULL) return 0; /* supports sizeof NULL */ 192 return sizeof(*ctx) 193 + ctx->queueSize * sizeof(POOL_job) 194 + ctx->threadCapacity * sizeof(ZSTD_pthread_t); 195 } 196 197 198 /* @return : 0 on success, 1 on error */ 199 static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) 200 { 201 if (numThreads <= ctx->threadCapacity) { 202 if (!numThreads) return 1; 203 ctx->threadLimit = numThreads; 204 return 0; 205 } 206 /* numThreads > threadCapacity */ 207 { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); 208 if (!threadPool) return 1; 209 /* replace existing thread pool */ 210 memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool)); 211 ZSTD_free(ctx->threads, ctx->customMem); 212 ctx->threads = threadPool; 213 /* Initialize additional threads */ 214 { size_t threadId; 215 for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) { 216 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) { 217 ctx->threadCapacity = threadId; 218 return 1; 219 } } 220 } } 221 /* successfully expanded */ 222 ctx->threadCapacity = numThreads; 223 ctx->threadLimit = numThreads; 224 return 0; 225 } 226 227 /* @return : 0 on success, 1 on error */ 228 int POOL_resize(POOL_ctx* ctx, size_t numThreads) 229 { 230 int result; 231 if (ctx==NULL) return 1; 232 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 233 result = POOL_resize_internal(ctx, numThreads); 234 ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); 235 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 236 return result; 237 } 238 239 /** 240 * Returns 1 if the queue is full and 0 otherwise. 241 * 242 * When queueSize is 1 (pool was created with an intended queueSize of 0), 243 * then a queue is empty if there is a thread free _and_ no job is waiting. 244 */ 245 static int isQueueFull(POOL_ctx const* ctx) { 246 if (ctx->queueSize > 1) { 247 return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); 248 } else { 249 return (ctx->numThreadsBusy == ctx->threadLimit) || 250 !ctx->queueEmpty; 251 } 252 } 253 254 255 static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) 256 { 257 POOL_job const job = {function, opaque}; 258 assert(ctx != NULL); 259 if (ctx->shutdown) return; 260 261 ctx->queueEmpty = 0; 262 ctx->queue[ctx->queueTail] = job; 263 ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; 264 ZSTD_pthread_cond_signal(&ctx->queuePopCond); 265 } 266 267 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) 268 { 269 assert(ctx != NULL); 270 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 271 /* Wait until there is space in the queue for the new job */ 272 while (isQueueFull(ctx) && (!ctx->shutdown)) { 273 ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); 274 } 275 POOL_add_internal(ctx, function, opaque); 276 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 277 } 278 279 280 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) 281 { 282 assert(ctx != NULL); 283 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 284 if (isQueueFull(ctx)) { 285 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 286 return 0; 287 } 288 POOL_add_internal(ctx, function, opaque); 289 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 290 return 1; 291 } 292 293 294 #else /* ZSTD_MULTITHREAD not defined */ 295 296 /* ========================== */ 297 /* No multi-threading support */ 298 /* ========================== */ 299 300 301 /* We don't need any data, but if it is empty, malloc() might return NULL. */ 302 struct POOL_ctx_s { 303 int dummy; 304 }; 305 static POOL_ctx g_ctx; 306 307 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { 308 return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); 309 } 310 311 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { 312 (void)numThreads; 313 (void)queueSize; 314 (void)customMem; 315 return &g_ctx; 316 } 317 318 void POOL_free(POOL_ctx* ctx) { 319 assert(!ctx || ctx == &g_ctx); 320 (void)ctx; 321 } 322 323 int POOL_resize(POOL_ctx* ctx, size_t numThreads) { 324 (void)ctx; (void)numThreads; 325 return 0; 326 } 327 328 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) { 329 (void)ctx; 330 function(opaque); 331 } 332 333 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) { 334 (void)ctx; 335 function(opaque); 336 return 1; 337 } 338 339 size_t POOL_sizeof(POOL_ctx* ctx) { 340 if (ctx==NULL) return 0; /* supports sizeof NULL */ 341 assert(ctx == &g_ctx); 342 return sizeof(*ctx); 343 } 344 345 #endif /* ZSTD_MULTITHREAD */ 346