xref: /freebsd/sys/contrib/openzfs/cmd/zstream/zstream_recompress.c (revision 61145dc2b94f12f6a47344fb9aac702321880e43)
1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3  * CDDL HEADER START
4  *
5  * The contents of this file are subject to the terms of the
6  * Common Development and Distribution License (the "License").
7  * You may not use this file except in compliance with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or https://opensource.org/licenses/CDDL-1.0.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 
23 /*
24  * Copyright 2022 Axcient.  All rights reserved.
25  * Use is subject to license terms.
26  *
27  * Copyright (c) 2022 by Delphix. All rights reserved.
28  * Copyright (c) 2024, Klara, Inc.
29  */
30 
31 #include <err.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <unistd.h>
35 #include <sys/zfs_ioctl.h>
36 #include <sys/zio_checksum.h>
37 #include <sys/zstd/zstd.h>
38 #include "zfs_fletcher.h"
39 #include "zstream.h"
40 
41 static int
dump_record(dmu_replay_record_t * drr,void * payload,int payload_len,zio_cksum_t * zc,int outfd)42 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
43     zio_cksum_t *zc, int outfd)
44 {
45 	assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
46 	    == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
47 	fletcher_4_incremental_native(drr,
48 	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
49 	if (drr->drr_type != DRR_BEGIN) {
50 		assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
51 		    drr_checksum.drr_checksum));
52 		drr->drr_u.drr_checksum.drr_checksum = *zc;
53 	}
54 	fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
55 	    sizeof (zio_cksum_t), zc);
56 	if (write(outfd, drr, sizeof (*drr)) == -1)
57 		return (errno);
58 	if (payload_len != 0) {
59 		fletcher_4_incremental_native(payload, payload_len, zc);
60 		if (write(outfd, payload, payload_len) == -1)
61 			return (errno);
62 	}
63 	return (0);
64 }
65 
66 int
zstream_do_recompress(int argc,char * argv[])67 zstream_do_recompress(int argc, char *argv[])
68 {
69 	int bufsz = SPA_MAXBLOCKSIZE;
70 	char *buf = safe_malloc(bufsz);
71 	dmu_replay_record_t thedrr;
72 	dmu_replay_record_t *drr = &thedrr;
73 	zio_cksum_t stream_cksum;
74 	int c;
75 	int level = 0;
76 
77 	while ((c = getopt(argc, argv, "l:")) != -1) {
78 		switch (c) {
79 		case 'l':
80 			if (sscanf(optarg, "%d", &level) != 1) {
81 				fprintf(stderr,
82 				    "failed to parse level '%s'\n",
83 				    optarg);
84 				zstream_usage();
85 			}
86 			break;
87 		case '?':
88 			(void) fprintf(stderr, "invalid option '%c'\n",
89 			    optopt);
90 			zstream_usage();
91 			break;
92 		}
93 	}
94 
95 	argc -= optind;
96 	argv += optind;
97 
98 	if (argc != 1)
99 		zstream_usage();
100 
101 	enum zio_compress ctype;
102 	if (strcmp(argv[0], "off") == 0) {
103 		ctype = ZIO_COMPRESS_OFF;
104 	} else {
105 		for (ctype = 0; ctype < ZIO_COMPRESS_FUNCTIONS; ctype++) {
106 			if (strcmp(argv[0],
107 			    zio_compress_table[ctype].ci_name) == 0)
108 				break;
109 		}
110 		if (ctype == ZIO_COMPRESS_FUNCTIONS ||
111 		    zio_compress_table[ctype].ci_compress == NULL) {
112 			fprintf(stderr, "Invalid compression type %s.\n",
113 			    argv[0]);
114 			exit(2);
115 		}
116 	}
117 
118 	if (isatty(STDIN_FILENO)) {
119 		(void) fprintf(stderr,
120 		    "Error: The send stream is a binary format "
121 		    "and can not be read from a\n"
122 		    "terminal.  Standard input must be redirected.\n");
123 		exit(1);
124 	}
125 
126 	abd_init();
127 	fletcher_4_init();
128 	zio_init();
129 	zstd_init();
130 	int begin = 0;
131 	boolean_t seen = B_FALSE;
132 	while (sfread(drr, sizeof (*drr), stdin) != 0) {
133 		struct drr_write *drrw;
134 		uint64_t payload_size = 0;
135 
136 		/*
137 		 * We need to regenerate the checksum.
138 		 */
139 		if (drr->drr_type != DRR_BEGIN) {
140 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
141 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
142 		}
143 
144 
145 		switch (drr->drr_type) {
146 		case DRR_BEGIN:
147 		{
148 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
149 			VERIFY0(begin++);
150 			seen = B_TRUE;
151 
152 			uint32_t sz = drr->drr_payloadlen;
153 
154 			VERIFY3U(sz, <=, 1U << 28);
155 
156 			if (sz != 0) {
157 				if (sz > bufsz) {
158 					buf = realloc(buf, sz);
159 					if (buf == NULL)
160 						err(1, "realloc");
161 					bufsz = sz;
162 				}
163 				(void) sfread(buf, sz, stdin);
164 			}
165 			payload_size = sz;
166 			break;
167 		}
168 		case DRR_END:
169 		{
170 			struct drr_end *drre = &drr->drr_u.drr_end;
171 			/*
172 			 * We would prefer to just check --begin == 0, but
173 			 * replication streams have an end of stream END
174 			 * record, so we must avoid tripping it.
175 			 */
176 			VERIFY3B(seen, ==, B_TRUE);
177 			begin--;
178 			/*
179 			 * Use the recalculated checksum, unless this is
180 			 * the END record of a stream package, which has
181 			 * no checksum.
182 			 */
183 			if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
184 				drre->drr_checksum = stream_cksum;
185 			break;
186 		}
187 
188 		case DRR_OBJECT:
189 		{
190 			struct drr_object *drro = &drr->drr_u.drr_object;
191 			VERIFY3S(begin, ==, 1);
192 
193 			if (drro->drr_bonuslen > 0) {
194 				payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
195 				(void) sfread(buf, payload_size, stdin);
196 			}
197 			break;
198 		}
199 
200 		case DRR_SPILL:
201 		{
202 			struct drr_spill *drrs = &drr->drr_u.drr_spill;
203 			VERIFY3S(begin, ==, 1);
204 			payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
205 			(void) sfread(buf, payload_size, stdin);
206 			break;
207 		}
208 
209 		case DRR_WRITE_BYREF:
210 			VERIFY3S(begin, ==, 1);
211 			fprintf(stderr,
212 			    "Deduplicated streams are not supported\n");
213 			exit(1);
214 			break;
215 
216 		case DRR_WRITE:
217 		{
218 			VERIFY3S(begin, ==, 1);
219 			drrw = &thedrr.drr_u.drr_write;
220 			payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
221 			/*
222 			 * In order to recompress an encrypted block, you have
223 			 * to decrypt, decompress, recompress, and
224 			 * re-encrypt. That can be a future enhancement (along
225 			 * with decryption or re-encryption), but for now we
226 			 * skip encrypted blocks.
227 			 */
228 			boolean_t encrypted = B_FALSE;
229 			for (int i = 0; i < ZIO_DATA_SALT_LEN; i++) {
230 				if (drrw->drr_salt[i] != 0) {
231 					encrypted = B_TRUE;
232 					break;
233 				}
234 			}
235 			if (encrypted) {
236 				(void) sfread(buf, payload_size, stdin);
237 				break;
238 			}
239 			enum zio_compress dtype = drrw->drr_compressiontype;
240 			if (dtype >= ZIO_COMPRESS_FUNCTIONS) {
241 				fprintf(stderr, "Invalid compression type in "
242 				    "stream: %d\n", dtype);
243 				exit(3);
244 			}
245 			if (zio_compress_table[dtype].ci_decompress == NULL)
246 				dtype = ZIO_COMPRESS_OFF;
247 
248 			/* Set up buffers to minimize memcpys */
249 			char *cbuf, *dbuf;
250 			if (ctype == ZIO_COMPRESS_OFF)
251 				dbuf = buf;
252 			else
253 				dbuf = safe_calloc(bufsz);
254 
255 			if (dtype == ZIO_COMPRESS_OFF)
256 				cbuf = dbuf;
257 			else
258 				cbuf = safe_calloc(payload_size);
259 
260 			/* Read and decompress the payload */
261 			(void) sfread(cbuf, payload_size, stdin);
262 			if (dtype != ZIO_COMPRESS_OFF) {
263 				abd_t cabd, dabd;
264 				abd_get_from_buf_struct(&cabd,
265 				    cbuf, payload_size);
266 				abd_get_from_buf_struct(&dabd, dbuf,
267 				    MIN(bufsz, drrw->drr_logical_size));
268 				if (zio_decompress_data(dtype, &cabd, &dabd,
269 				    payload_size, abd_get_size(&dabd),
270 				    NULL) != 0) {
271 					warnx("decompression type %d failed "
272 					    "for ino %llu offset %llu",
273 					    dtype,
274 					    (u_longlong_t)drrw->drr_object,
275 					    (u_longlong_t)drrw->drr_offset);
276 					exit(4);
277 				}
278 				payload_size = drrw->drr_logical_size;
279 				abd_free(&dabd);
280 				abd_free(&cabd);
281 				free(cbuf);
282 			}
283 
284 			/* Recompress the payload */
285 			if (ctype != ZIO_COMPRESS_OFF) {
286 				abd_t dabd, abd;
287 				abd_get_from_buf_struct(&dabd,
288 				    dbuf, drrw->drr_logical_size);
289 				abd_t *pabd =
290 				    abd_get_from_buf_struct(&abd, buf, bufsz);
291 				size_t csize = zio_compress_data(ctype, &dabd,
292 				    &pabd, drrw->drr_logical_size,
293 				    drrw->drr_logical_size, level);
294 				size_t rounded =
295 				    P2ROUNDUP(csize, SPA_MINBLOCKSIZE);
296 				if (rounded >= drrw->drr_logical_size) {
297 					memcpy(buf, dbuf, payload_size);
298 					drrw->drr_compressiontype = 0;
299 					drrw->drr_compressed_size = 0;
300 				} else {
301 					abd_zero_off(pabd, csize,
302 					    rounded - csize);
303 					drrw->drr_compressiontype = ctype;
304 					drrw->drr_compressed_size =
305 					    payload_size = rounded;
306 				}
307 				abd_free(&abd);
308 				abd_free(&dabd);
309 				free(dbuf);
310 			} else {
311 				drrw->drr_compressiontype = 0;
312 				drrw->drr_compressed_size = 0;
313 			}
314 			break;
315 		}
316 
317 		case DRR_WRITE_EMBEDDED:
318 		{
319 			struct drr_write_embedded *drrwe =
320 			    &drr->drr_u.drr_write_embedded;
321 			VERIFY3S(begin, ==, 1);
322 			payload_size =
323 			    P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
324 			(void) sfread(buf, payload_size, stdin);
325 			break;
326 		}
327 
328 		case DRR_FREEOBJECTS:
329 		case DRR_FREE:
330 		case DRR_OBJECT_RANGE:
331 			VERIFY3S(begin, ==, 1);
332 			break;
333 
334 		default:
335 			(void) fprintf(stderr, "INVALID record type 0x%x\n",
336 			    drr->drr_type);
337 			/* should never happen, so assert */
338 			assert(B_FALSE);
339 		}
340 
341 		if (feof(stdout)) {
342 			fprintf(stderr, "Error: unexpected end-of-file\n");
343 			exit(1);
344 		}
345 		if (ferror(stdout)) {
346 			fprintf(stderr, "Error while reading file: %s\n",
347 			    strerror(errno));
348 			exit(1);
349 		}
350 
351 		/*
352 		 * We need to recalculate the checksum, and it needs to be
353 		 * initially zero to do that.  BEGIN records don't have
354 		 * a checksum.
355 		 */
356 		if (drr->drr_type != DRR_BEGIN) {
357 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
358 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
359 		}
360 		if (dump_record(drr, buf, payload_size,
361 		    &stream_cksum, STDOUT_FILENO) != 0)
362 			break;
363 		if (drr->drr_type == DRR_END) {
364 			/*
365 			 * Typically the END record is either the last
366 			 * thing in the stream, or it is followed
367 			 * by a BEGIN record (which also zeros the checksum).
368 			 * However, a stream package ends with two END
369 			 * records.  The last END record's checksum starts
370 			 * from zero.
371 			 */
372 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
373 		}
374 	}
375 	free(buf);
376 	fletcher_4_fini();
377 	zio_fini();
378 	zstd_fini();
379 	abd_fini();
380 
381 	return (0);
382 }
383