xref: /freebsd/sys/contrib/openzfs/cmd/zstream/zstream_recompress.c (revision 5ca8e32633c4ffbbcd6762e5888b6a4ba0708c6c)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or https://opensource.org/licenses/CDDL-1.0.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2022 Axcient.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * Copyright (c) 2022 by Delphix. All rights reserved.
29  */
30 
31 #include <err.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <unistd.h>
35 #include <sys/zfs_ioctl.h>
36 #include <sys/zio_checksum.h>
37 #include <sys/zstd/zstd.h>
38 #include "zfs_fletcher.h"
39 #include "zstream.h"
40 
41 static int
42 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
43     zio_cksum_t *zc, int outfd)
44 {
45 	assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
46 	    == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
47 	fletcher_4_incremental_native(drr,
48 	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
49 	if (drr->drr_type != DRR_BEGIN) {
50 		assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
51 		    drr_checksum.drr_checksum));
52 		drr->drr_u.drr_checksum.drr_checksum = *zc;
53 	}
54 	fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
55 	    sizeof (zio_cksum_t), zc);
56 	if (write(outfd, drr, sizeof (*drr)) == -1)
57 		return (errno);
58 	if (payload_len != 0) {
59 		fletcher_4_incremental_native(payload, payload_len, zc);
60 		if (write(outfd, payload, payload_len) == -1)
61 			return (errno);
62 	}
63 	return (0);
64 }
65 
66 int
67 zstream_do_recompress(int argc, char *argv[])
68 {
69 	int bufsz = SPA_MAXBLOCKSIZE;
70 	char *buf = safe_malloc(bufsz);
71 	dmu_replay_record_t thedrr;
72 	dmu_replay_record_t *drr = &thedrr;
73 	zio_cksum_t stream_cksum;
74 	int c;
75 	int level = -1;
76 
77 	while ((c = getopt(argc, argv, "l:")) != -1) {
78 		switch (c) {
79 		case 'l':
80 			if (sscanf(optarg, "%d", &level) != 0) {
81 				fprintf(stderr,
82 				    "failed to parse level '%s'\n",
83 				    optarg);
84 				zstream_usage();
85 			}
86 			break;
87 		case '?':
88 			(void) fprintf(stderr, "invalid option '%c'\n",
89 			    optopt);
90 			zstream_usage();
91 			break;
92 		}
93 	}
94 
95 	argc -= optind;
96 	argv += optind;
97 
98 	if (argc != 1)
99 		zstream_usage();
100 	int type = 0;
101 	zio_compress_info_t *cinfo = NULL;
102 	if (0 == strcmp(argv[0], "off")) {
103 		type = ZIO_COMPRESS_OFF;
104 		cinfo = &zio_compress_table[type];
105 	} else if (0 == strcmp(argv[0], "inherit") ||
106 	    0 == strcmp(argv[0], "empty") ||
107 	    0 == strcmp(argv[0], "on")) {
108 		// Fall through to invalid compression type case
109 	} else {
110 		for (int i = 0; i < ZIO_COMPRESS_FUNCTIONS; i++) {
111 			if (0 == strcmp(zio_compress_table[i].ci_name,
112 			    argv[0])) {
113 				cinfo = &zio_compress_table[i];
114 				type = i;
115 				break;
116 			}
117 		}
118 	}
119 	if (cinfo == NULL) {
120 		fprintf(stderr, "Invalid compression type %s.\n",
121 		    argv[0]);
122 		exit(2);
123 	}
124 
125 	if (cinfo->ci_compress == NULL) {
126 		type = 0;
127 		cinfo = &zio_compress_table[0];
128 	}
129 
130 	if (isatty(STDIN_FILENO)) {
131 		(void) fprintf(stderr,
132 		    "Error: The send stream is a binary format "
133 		    "and can not be read from a\n"
134 		    "terminal.  Standard input must be redirected.\n");
135 		exit(1);
136 	}
137 
138 	fletcher_4_init();
139 	zio_init();
140 	zstd_init();
141 	int begin = 0;
142 	boolean_t seen = B_FALSE;
143 	while (sfread(drr, sizeof (*drr), stdin) != 0) {
144 		struct drr_write *drrw;
145 		uint64_t payload_size = 0;
146 
147 		/*
148 		 * We need to regenerate the checksum.
149 		 */
150 		if (drr->drr_type != DRR_BEGIN) {
151 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
152 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
153 		}
154 
155 
156 		switch (drr->drr_type) {
157 		case DRR_BEGIN:
158 		{
159 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
160 			VERIFY0(begin++);
161 			seen = B_TRUE;
162 
163 			uint32_t sz = drr->drr_payloadlen;
164 
165 			VERIFY3U(sz, <=, 1U << 28);
166 
167 			if (sz != 0) {
168 				if (sz > bufsz) {
169 					buf = realloc(buf, sz);
170 					if (buf == NULL)
171 						err(1, "realloc");
172 					bufsz = sz;
173 				}
174 				(void) sfread(buf, sz, stdin);
175 			}
176 			payload_size = sz;
177 			break;
178 		}
179 		case DRR_END:
180 		{
181 			struct drr_end *drre = &drr->drr_u.drr_end;
182 			/*
183 			 * We would prefer to just check --begin == 0, but
184 			 * replication streams have an end of stream END
185 			 * record, so we must avoid tripping it.
186 			 */
187 			VERIFY3B(seen, ==, B_TRUE);
188 			begin--;
189 			/*
190 			 * Use the recalculated checksum, unless this is
191 			 * the END record of a stream package, which has
192 			 * no checksum.
193 			 */
194 			if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
195 				drre->drr_checksum = stream_cksum;
196 			break;
197 		}
198 
199 		case DRR_OBJECT:
200 		{
201 			struct drr_object *drro = &drr->drr_u.drr_object;
202 			VERIFY3S(begin, ==, 1);
203 
204 			if (drro->drr_bonuslen > 0) {
205 				payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
206 				(void) sfread(buf, payload_size, stdin);
207 			}
208 			break;
209 		}
210 
211 		case DRR_SPILL:
212 		{
213 			struct drr_spill *drrs = &drr->drr_u.drr_spill;
214 			VERIFY3S(begin, ==, 1);
215 			payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
216 			(void) sfread(buf, payload_size, stdin);
217 			break;
218 		}
219 
220 		case DRR_WRITE_BYREF:
221 			VERIFY3S(begin, ==, 1);
222 			fprintf(stderr,
223 			    "Deduplicated streams are not supported\n");
224 			exit(1);
225 			break;
226 
227 		case DRR_WRITE:
228 		{
229 			VERIFY3S(begin, ==, 1);
230 			drrw = &thedrr.drr_u.drr_write;
231 			payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
232 			/*
233 			 * In order to recompress an encrypted block, you have
234 			 * to decrypt, decompress, recompress, and
235 			 * re-encrypt. That can be a future enhancement (along
236 			 * with decryption or re-encryption), but for now we
237 			 * skip encrypted blocks.
238 			 */
239 			boolean_t encrypted = B_FALSE;
240 			for (int i = 0; i < ZIO_DATA_SALT_LEN; i++) {
241 				if (drrw->drr_salt[i] != 0) {
242 					encrypted = B_TRUE;
243 					break;
244 				}
245 			}
246 			if (encrypted) {
247 				(void) sfread(buf, payload_size, stdin);
248 				break;
249 			}
250 			if (drrw->drr_compressiontype >=
251 			    ZIO_COMPRESS_FUNCTIONS) {
252 				fprintf(stderr, "Invalid compression type in "
253 				    "stream: %d\n", drrw->drr_compressiontype);
254 				exit(3);
255 			}
256 			zio_compress_info_t *dinfo =
257 			    &zio_compress_table[drrw->drr_compressiontype];
258 
259 			/* Set up buffers to minimize memcpys */
260 			char *cbuf, *dbuf;
261 			if (cinfo->ci_compress == NULL)
262 				dbuf = buf;
263 			else
264 				dbuf = safe_calloc(bufsz);
265 
266 			if (dinfo->ci_decompress == NULL)
267 				cbuf = dbuf;
268 			else
269 				cbuf = safe_calloc(payload_size);
270 
271 			/* Read and decompress the payload */
272 			(void) sfread(cbuf, payload_size, stdin);
273 			if (dinfo->ci_decompress != NULL) {
274 				if (0 != dinfo->ci_decompress(cbuf, dbuf,
275 				    payload_size, MIN(bufsz,
276 				    drrw->drr_logical_size), dinfo->ci_level)) {
277 					warnx("decompression type %d failed "
278 					    "for ino %llu offset %llu",
279 					    type,
280 					    (u_longlong_t)drrw->drr_object,
281 					    (u_longlong_t)drrw->drr_offset);
282 					exit(4);
283 				}
284 				payload_size = drrw->drr_logical_size;
285 				free(cbuf);
286 			}
287 
288 			/* Recompress the payload */
289 			if (cinfo->ci_compress != NULL) {
290 				payload_size = P2ROUNDUP(cinfo->ci_compress(
291 				    dbuf, buf, drrw->drr_logical_size,
292 				    MIN(payload_size, bufsz), (level == -1 ?
293 				    cinfo->ci_level : level)),
294 				    SPA_MINBLOCKSIZE);
295 				if (payload_size != drrw->drr_logical_size) {
296 					drrw->drr_compressiontype = type;
297 					drrw->drr_compressed_size =
298 					    payload_size;
299 				} else {
300 					memcpy(buf, dbuf, payload_size);
301 					drrw->drr_compressiontype = 0;
302 					drrw->drr_compressed_size = 0;
303 				}
304 				free(dbuf);
305 			} else {
306 				drrw->drr_compressiontype = type;
307 				drrw->drr_compressed_size = 0;
308 			}
309 			break;
310 		}
311 
312 		case DRR_WRITE_EMBEDDED:
313 		{
314 			struct drr_write_embedded *drrwe =
315 			    &drr->drr_u.drr_write_embedded;
316 			VERIFY3S(begin, ==, 1);
317 			payload_size =
318 			    P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
319 			(void) sfread(buf, payload_size, stdin);
320 			break;
321 		}
322 
323 		case DRR_FREEOBJECTS:
324 		case DRR_FREE:
325 		case DRR_OBJECT_RANGE:
326 			VERIFY3S(begin, ==, 1);
327 			break;
328 
329 		default:
330 			(void) fprintf(stderr, "INVALID record type 0x%x\n",
331 			    drr->drr_type);
332 			/* should never happen, so assert */
333 			assert(B_FALSE);
334 		}
335 
336 		if (feof(stdout)) {
337 			fprintf(stderr, "Error: unexpected end-of-file\n");
338 			exit(1);
339 		}
340 		if (ferror(stdout)) {
341 			fprintf(stderr, "Error while reading file: %s\n",
342 			    strerror(errno));
343 			exit(1);
344 		}
345 
346 		/*
347 		 * We need to recalculate the checksum, and it needs to be
348 		 * initially zero to do that.  BEGIN records don't have
349 		 * a checksum.
350 		 */
351 		if (drr->drr_type != DRR_BEGIN) {
352 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
353 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
354 		}
355 		if (dump_record(drr, buf, payload_size,
356 		    &stream_cksum, STDOUT_FILENO) != 0)
357 			break;
358 		if (drr->drr_type == DRR_END) {
359 			/*
360 			 * Typically the END record is either the last
361 			 * thing in the stream, or it is followed
362 			 * by a BEGIN record (which also zeros the checksum).
363 			 * However, a stream package ends with two END
364 			 * records.  The last END record's checksum starts
365 			 * from zero.
366 			 */
367 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
368 		}
369 	}
370 	free(buf);
371 	fletcher_4_fini();
372 	zio_fini();
373 	zstd_fini();
374 
375 	return (0);
376 }
377