1 /* 2 * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. 3 * All rights reserved. 4 * 5 * This source code is licensed under both the BSD-style license (found in the 6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7 * in the COPYING file in the root directory of this source tree). 8 * You may select, at your option, one of the above-listed licenses. 9 */ 10 11 12 /* ====== Dependencies ======= */ 13 #include <stddef.h> /* size_t */ 14 #include <stdlib.h> /* malloc, calloc, free */ 15 #include "pool.h" 16 17 /* ====== Compiler specifics ====== */ 18 #if defined(_MSC_VER) 19 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ 20 #endif 21 22 23 #ifdef ZSTD_MULTITHREAD 24 25 #include "threading.h" /* pthread adaptation */ 26 27 /* A job is a function and an opaque argument */ 28 typedef struct POOL_job_s { 29 POOL_function function; 30 void *opaque; 31 } POOL_job; 32 33 struct POOL_ctx_s { 34 ZSTD_customMem customMem; 35 /* Keep track of the threads */ 36 ZSTD_pthread_t *threads; 37 size_t numThreads; 38 39 /* The queue is a circular buffer */ 40 POOL_job *queue; 41 size_t queueHead; 42 size_t queueTail; 43 size_t queueSize; 44 45 /* The number of threads working on jobs */ 46 size_t numThreadsBusy; 47 /* Indicates if the queue is empty */ 48 int queueEmpty; 49 50 /* The mutex protects the queue */ 51 ZSTD_pthread_mutex_t queueMutex; 52 /* Condition variable for pushers to wait on when the queue is full */ 53 ZSTD_pthread_cond_t queuePushCond; 54 /* Condition variables for poppers to wait on when the queue is empty */ 55 ZSTD_pthread_cond_t queuePopCond; 56 /* Indicates if the queue is shutting down */ 57 int shutdown; 58 }; 59 60 /* POOL_thread() : 61 Work thread for the thread pool. 62 Waits for jobs and executes them. 63 @returns : NULL on failure else non-null. 64 */ 65 static void* POOL_thread(void* opaque) { 66 POOL_ctx* const ctx = (POOL_ctx*)opaque; 67 if (!ctx) { return NULL; } 68 for (;;) { 69 /* Lock the mutex and wait for a non-empty queue or until shutdown */ 70 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 71 72 while (ctx->queueEmpty && !ctx->shutdown) { 73 ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); 74 } 75 /* empty => shutting down: so stop */ 76 if (ctx->queueEmpty) { 77 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 78 return opaque; 79 } 80 /* Pop a job off the queue */ 81 { POOL_job const job = ctx->queue[ctx->queueHead]; 82 ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; 83 ctx->numThreadsBusy++; 84 ctx->queueEmpty = ctx->queueHead == ctx->queueTail; 85 /* Unlock the mutex, signal a pusher, and run the job */ 86 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 87 ZSTD_pthread_cond_signal(&ctx->queuePushCond); 88 89 job.function(job.opaque); 90 91 /* If the intended queue size was 0, signal after finishing job */ 92 if (ctx->queueSize == 1) { 93 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 94 ctx->numThreadsBusy--; 95 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 96 ZSTD_pthread_cond_signal(&ctx->queuePushCond); 97 } } 98 } /* for (;;) */ 99 /* Unreachable */ 100 } 101 102 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { 103 return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); 104 } 105 106 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { 107 POOL_ctx* ctx; 108 /* Check the parameters */ 109 if (!numThreads) { return NULL; } 110 /* Allocate the context and zero initialize */ 111 ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem); 112 if (!ctx) { return NULL; } 113 /* Initialize the job queue. 114 * It needs one extra space since one space is wasted to differentiate empty 115 * and full queues. 116 */ 117 ctx->queueSize = queueSize + 1; 118 ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job)); 119 ctx->queueHead = 0; 120 ctx->queueTail = 0; 121 ctx->numThreadsBusy = 0; 122 ctx->queueEmpty = 1; 123 (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL); 124 (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL); 125 (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); 126 ctx->shutdown = 0; 127 /* Allocate space for the thread handles */ 128 ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem); 129 ctx->numThreads = 0; 130 ctx->customMem = customMem; 131 /* Check for errors */ 132 if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } 133 /* Initialize the threads */ 134 { size_t i; 135 for (i = 0; i < numThreads; ++i) { 136 if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { 137 ctx->numThreads = i; 138 POOL_free(ctx); 139 return NULL; 140 } } 141 ctx->numThreads = numThreads; 142 } 143 return ctx; 144 } 145 146 /*! POOL_join() : 147 Shutdown the queue, wake any sleeping threads, and join all of the threads. 148 */ 149 static void POOL_join(POOL_ctx* ctx) { 150 /* Shut down the queue */ 151 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 152 ctx->shutdown = 1; 153 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 154 /* Wake up sleeping threads */ 155 ZSTD_pthread_cond_broadcast(&ctx->queuePushCond); 156 ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); 157 /* Join all of the threads */ 158 { size_t i; 159 for (i = 0; i < ctx->numThreads; ++i) { 160 ZSTD_pthread_join(ctx->threads[i], NULL); 161 } } 162 } 163 164 void POOL_free(POOL_ctx *ctx) { 165 if (!ctx) { return; } 166 POOL_join(ctx); 167 ZSTD_pthread_mutex_destroy(&ctx->queueMutex); 168 ZSTD_pthread_cond_destroy(&ctx->queuePushCond); 169 ZSTD_pthread_cond_destroy(&ctx->queuePopCond); 170 ZSTD_free(ctx->queue, ctx->customMem); 171 ZSTD_free(ctx->threads, ctx->customMem); 172 ZSTD_free(ctx, ctx->customMem); 173 } 174 175 size_t POOL_sizeof(POOL_ctx *ctx) { 176 if (ctx==NULL) return 0; /* supports sizeof NULL */ 177 return sizeof(*ctx) 178 + ctx->queueSize * sizeof(POOL_job) 179 + ctx->numThreads * sizeof(ZSTD_pthread_t); 180 } 181 182 /** 183 * Returns 1 if the queue is full and 0 otherwise. 184 * 185 * If the queueSize is 1 (the pool was created with an intended queueSize of 0), 186 * then a queue is empty if there is a thread free and no job is waiting. 187 */ 188 static int isQueueFull(POOL_ctx const* ctx) { 189 if (ctx->queueSize > 1) { 190 return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); 191 } else { 192 return ctx->numThreadsBusy == ctx->numThreads || 193 !ctx->queueEmpty; 194 } 195 } 196 197 void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { 198 POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; 199 if (!ctx) { return; } 200 201 ZSTD_pthread_mutex_lock(&ctx->queueMutex); 202 { POOL_job const job = {function, opaque}; 203 204 /* Wait until there is space in the queue for the new job */ 205 while (isQueueFull(ctx) && !ctx->shutdown) { 206 ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); 207 } 208 /* The queue is still going => there is space */ 209 if (!ctx->shutdown) { 210 ctx->queueEmpty = 0; 211 ctx->queue[ctx->queueTail] = job; 212 ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; 213 } 214 } 215 ZSTD_pthread_mutex_unlock(&ctx->queueMutex); 216 ZSTD_pthread_cond_signal(&ctx->queuePopCond); 217 } 218 219 #else /* ZSTD_MULTITHREAD not defined */ 220 /* No multi-threading support */ 221 222 /* We don't need any data, but if it is empty malloc() might return NULL. */ 223 struct POOL_ctx_s { 224 int dummy; 225 }; 226 static POOL_ctx g_ctx; 227 228 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { 229 return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); 230 } 231 232 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { 233 (void)numThreads; 234 (void)queueSize; 235 (void)customMem; 236 return &g_ctx; 237 } 238 239 void POOL_free(POOL_ctx* ctx) { 240 assert(!ctx || ctx == &g_ctx); 241 (void)ctx; 242 } 243 244 void POOL_add(void* ctx, POOL_function function, void* opaque) { 245 (void)ctx; 246 function(opaque); 247 } 248 249 size_t POOL_sizeof(POOL_ctx* ctx) { 250 if (ctx==NULL) return 0; /* supports sizeof NULL */ 251 assert(ctx == &g_ctx); 252 return sizeof(*ctx); 253 } 254 255 #endif /* ZSTD_MULTITHREAD */ 256