xref: /freebsd/sys/contrib/zstd/examples/streaming_compression_thread_pool.c (revision dd41de95a84d979615a2ef11df6850622bf6184e)
1 /*
2  * Copyright (c) 2020, Martin Liska, SUSE, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  * You may select, at your option, one of the above-listed licenses.
9  */
10 
11 
12 #include <stdio.h>     // printf
13 #include <stdlib.h>    // free
14 #include <string.h>    // memset, strcat, strlen
15 #include <zstd.h>      // presumes zstd library is installed
16 #include "common.h"    // Helper functions, CHECK(), and CHECK_ZSTD()
17 #include <pthread.h>
18 
19 typedef struct compress_args
20 {
21   const char *fname;
22   char *outName;
23   int cLevel;
24 #if defined(ZSTD_STATIC_LINKING_ONLY)
25   ZSTD_threadPool *pool;
26 #endif
27 } compress_args_t;
28 
29 static void *compressFile_orDie(void *data)
30 {
31     compress_args_t *args = (compress_args_t *)data;
32     fprintf (stderr, "Starting compression of %s with level %d\n", args->fname, args->cLevel);
33     /* Open the input and output files. */
34     FILE* const fin  = fopen_orDie(args->fname, "rb");
35     FILE* const fout = fopen_orDie(args->outName, "wb");
36     /* Create the input and output buffers.
37      * They may be any size, but we recommend using these functions to size them.
38      * Performance will only suffer significantly for very tiny buffers.
39      */
40     size_t const buffInSize = ZSTD_CStreamInSize();
41     void*  const buffIn  = malloc_orDie(buffInSize);
42     size_t const buffOutSize = ZSTD_CStreamOutSize();
43     void*  const buffOut = malloc_orDie(buffOutSize);
44 
45     /* Create the context. */
46     ZSTD_CCtx* const cctx = ZSTD_createCCtx();
47     CHECK(cctx != NULL, "ZSTD_createCCtx() failed!");
48 
49 #if defined(ZSTD_STATIC_LINKING_ONLY)
50     size_t r = ZSTD_CCtx_refThreadPool(cctx, args->pool);
51     CHECK(r == 0, "ZSTD_CCtx_refThreadPool failed!");
52 #endif
53 
54     /* Set any parameters you want.
55      * Here we set the compression level, and enable the checksum.
56      */
57     CHECK_ZSTD( ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, args->cLevel) );
58     CHECK_ZSTD( ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1) );
59     ZSTD_CCtx_setParameter(cctx, ZSTD_c_nbWorkers, 16);
60 
61     /* This loop read from the input file, compresses that entire chunk,
62      * and writes all output produced to the output file.
63      */
64     size_t const toRead = buffInSize;
65     for (;;) {
66         size_t read = fread_orDie(buffIn, toRead, fin);
67         /* Select the flush mode.
68          * If the read may not be finished (read == toRead) we use
69          * ZSTD_e_continue. If this is the last chunk, we use ZSTD_e_end.
70          * Zstd optimizes the case where the first flush mode is ZSTD_e_end,
71          * since it knows it is compressing the entire source in one pass.
72          */
73         int const lastChunk = (read < toRead);
74         ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
75         /* Set the input buffer to what we just read.
76          * We compress until the input buffer is empty, each time flushing the
77          * output.
78          */
79         ZSTD_inBuffer input = { buffIn, read, 0 };
80         int finished;
81         do {
82             /* Compress into the output buffer and write all of the output to
83              * the file so we can reuse the buffer next iteration.
84              */
85             ZSTD_outBuffer output = { buffOut, buffOutSize, 0 };
86             size_t const remaining = ZSTD_compressStream2(cctx, &output , &input, mode);
87             CHECK_ZSTD(remaining);
88             fwrite_orDie(buffOut, output.pos, fout);
89             /* If we're on the last chunk we're finished when zstd returns 0,
90              * which means its consumed all the input AND finished the frame.
91              * Otherwise, we're finished when we've consumed all the input.
92              */
93             finished = lastChunk ? (remaining == 0) : (input.pos == input.size);
94         } while (!finished);
95         CHECK(input.pos == input.size,
96               "Impossible: zstd only returns 0 when the input is completely consumed!");
97 
98         if (lastChunk) {
99             break;
100         }
101     }
102 
103     fprintf (stderr, "Finishing compression of %s\n", args->outName);
104 
105     ZSTD_freeCCtx(cctx);
106     fclose_orDie(fout);
107     fclose_orDie(fin);
108     free(buffIn);
109     free(buffOut);
110     free(args->outName);
111 
112     return NULL;
113 }
114 
115 
116 static char* createOutFilename_orDie(const char* filename)
117 {
118     size_t const inL = strlen(filename);
119     size_t const outL = inL + 5;
120     void* const outSpace = malloc_orDie(outL);
121     memset(outSpace, 0, outL);
122     strcat(outSpace, filename);
123     strcat(outSpace, ".zst");
124     return (char*)outSpace;
125 }
126 
127 int main(int argc, const char** argv)
128 {
129     const char* const exeName = argv[0];
130 
131     if (argc<=3) {
132         printf("wrong arguments\n");
133         printf("usage:\n");
134         printf("%s POOL_SIZE LEVEL FILES\n", exeName);
135         return 1;
136     }
137 
138     int pool_size = atoi (argv[1]);
139     CHECK(pool_size != 0, "can't parse POOL_SIZE!");
140 
141     int level = atoi (argv[2]);
142     CHECK(level != 0, "can't parse LEVEL!");
143 
144     argc -= 3;
145     argv += 3;
146 
147 #if defined(ZSTD_STATIC_LINKING_ONLY)
148     ZSTD_threadPool *pool = ZSTD_createThreadPool (pool_size);
149     CHECK(pool != NULL, "ZSTD_createThreadPool() failed!");
150     fprintf (stderr, "Using shared thread pool of size %d\n", pool_size);
151 #else
152     fprintf (stderr, "All threads use its own thread pool\n");
153 #endif
154 
155     pthread_t *threads = malloc_orDie(argc * sizeof(pthread_t));
156     compress_args_t *args = malloc_orDie(argc * sizeof(compress_args_t));
157 
158     for (unsigned i = 0; i < argc; i++)
159     {
160       args[i].fname = argv[i];
161       args[i].outName = createOutFilename_orDie(args[i].fname);
162       args[i].cLevel = level;
163 #if defined(ZSTD_STATIC_LINKING_ONLY)
164       args[i].pool = pool;
165 #endif
166 
167       pthread_create (&threads[i], NULL, compressFile_orDie, &args[i]);
168     }
169 
170     for (unsigned i = 0; i < argc; i++)
171       pthread_join (threads[i], NULL);
172 
173 #if defined(ZSTD_STATIC_LINKING_ONLY)
174     ZSTD_freeThreadPool (pool);
175 #endif
176 
177     return 0;
178 }
179