xref: /freebsd/sys/contrib/openzfs/cmd/zstream/zstream_recompress.c (revision b2d2a78ad80ec68d4a17f5aef97d21686cb1e29b)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or https://opensource.org/licenses/CDDL-1.0.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2022 Axcient.  All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2022 by Delphix. All rights reserved.
27  * Copyright (c) 2024, Klara, Inc.
28  */
29 
30 #include <err.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <unistd.h>
34 #include <sys/zfs_ioctl.h>
35 #include <sys/zio_checksum.h>
36 #include <sys/zstd/zstd.h>
37 #include "zfs_fletcher.h"
38 #include "zstream.h"
39 
40 static int
41 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
42     zio_cksum_t *zc, int outfd)
43 {
44 	assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
45 	    == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
46 	fletcher_4_incremental_native(drr,
47 	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
48 	if (drr->drr_type != DRR_BEGIN) {
49 		assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
50 		    drr_checksum.drr_checksum));
51 		drr->drr_u.drr_checksum.drr_checksum = *zc;
52 	}
53 	fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
54 	    sizeof (zio_cksum_t), zc);
55 	if (write(outfd, drr, sizeof (*drr)) == -1)
56 		return (errno);
57 	if (payload_len != 0) {
58 		fletcher_4_incremental_native(payload, payload_len, zc);
59 		if (write(outfd, payload, payload_len) == -1)
60 			return (errno);
61 	}
62 	return (0);
63 }
64 
65 int
66 zstream_do_recompress(int argc, char *argv[])
67 {
68 	int bufsz = SPA_MAXBLOCKSIZE;
69 	char *buf = safe_malloc(bufsz);
70 	dmu_replay_record_t thedrr;
71 	dmu_replay_record_t *drr = &thedrr;
72 	zio_cksum_t stream_cksum;
73 	int c;
74 	int level = 0;
75 
76 	while ((c = getopt(argc, argv, "l:")) != -1) {
77 		switch (c) {
78 		case 'l':
79 			if (sscanf(optarg, "%d", &level) != 1) {
80 				fprintf(stderr,
81 				    "failed to parse level '%s'\n",
82 				    optarg);
83 				zstream_usage();
84 			}
85 			break;
86 		case '?':
87 			(void) fprintf(stderr, "invalid option '%c'\n",
88 			    optopt);
89 			zstream_usage();
90 			break;
91 		}
92 	}
93 
94 	argc -= optind;
95 	argv += optind;
96 
97 	if (argc != 1)
98 		zstream_usage();
99 
100 	enum zio_compress ctype;
101 	if (strcmp(argv[0], "off") == 0) {
102 		ctype = ZIO_COMPRESS_OFF;
103 	} else {
104 		for (ctype = 0; ctype < ZIO_COMPRESS_FUNCTIONS; ctype++) {
105 			if (strcmp(argv[0],
106 			    zio_compress_table[ctype].ci_name) == 0)
107 				break;
108 		}
109 		if (ctype == ZIO_COMPRESS_FUNCTIONS ||
110 		    zio_compress_table[ctype].ci_compress == NULL) {
111 			fprintf(stderr, "Invalid compression type %s.\n",
112 			    argv[0]);
113 			exit(2);
114 		}
115 	}
116 
117 	if (isatty(STDIN_FILENO)) {
118 		(void) fprintf(stderr,
119 		    "Error: The send stream is a binary format "
120 		    "and can not be read from a\n"
121 		    "terminal.  Standard input must be redirected.\n");
122 		exit(1);
123 	}
124 
125 	abd_init();
126 	fletcher_4_init();
127 	zio_init();
128 	zstd_init();
129 	int begin = 0;
130 	boolean_t seen = B_FALSE;
131 	while (sfread(drr, sizeof (*drr), stdin) != 0) {
132 		struct drr_write *drrw;
133 		uint64_t payload_size = 0;
134 
135 		/*
136 		 * We need to regenerate the checksum.
137 		 */
138 		if (drr->drr_type != DRR_BEGIN) {
139 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
140 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
141 		}
142 
143 
144 		switch (drr->drr_type) {
145 		case DRR_BEGIN:
146 		{
147 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
148 			VERIFY0(begin++);
149 			seen = B_TRUE;
150 
151 			uint32_t sz = drr->drr_payloadlen;
152 
153 			VERIFY3U(sz, <=, 1U << 28);
154 
155 			if (sz != 0) {
156 				if (sz > bufsz) {
157 					buf = realloc(buf, sz);
158 					if (buf == NULL)
159 						err(1, "realloc");
160 					bufsz = sz;
161 				}
162 				(void) sfread(buf, sz, stdin);
163 			}
164 			payload_size = sz;
165 			break;
166 		}
167 		case DRR_END:
168 		{
169 			struct drr_end *drre = &drr->drr_u.drr_end;
170 			/*
171 			 * We would prefer to just check --begin == 0, but
172 			 * replication streams have an end of stream END
173 			 * record, so we must avoid tripping it.
174 			 */
175 			VERIFY3B(seen, ==, B_TRUE);
176 			begin--;
177 			/*
178 			 * Use the recalculated checksum, unless this is
179 			 * the END record of a stream package, which has
180 			 * no checksum.
181 			 */
182 			if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
183 				drre->drr_checksum = stream_cksum;
184 			break;
185 		}
186 
187 		case DRR_OBJECT:
188 		{
189 			struct drr_object *drro = &drr->drr_u.drr_object;
190 			VERIFY3S(begin, ==, 1);
191 
192 			if (drro->drr_bonuslen > 0) {
193 				payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
194 				(void) sfread(buf, payload_size, stdin);
195 			}
196 			break;
197 		}
198 
199 		case DRR_SPILL:
200 		{
201 			struct drr_spill *drrs = &drr->drr_u.drr_spill;
202 			VERIFY3S(begin, ==, 1);
203 			payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
204 			(void) sfread(buf, payload_size, stdin);
205 			break;
206 		}
207 
208 		case DRR_WRITE_BYREF:
209 			VERIFY3S(begin, ==, 1);
210 			fprintf(stderr,
211 			    "Deduplicated streams are not supported\n");
212 			exit(1);
213 			break;
214 
215 		case DRR_WRITE:
216 		{
217 			VERIFY3S(begin, ==, 1);
218 			drrw = &thedrr.drr_u.drr_write;
219 			payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
220 			/*
221 			 * In order to recompress an encrypted block, you have
222 			 * to decrypt, decompress, recompress, and
223 			 * re-encrypt. That can be a future enhancement (along
224 			 * with decryption or re-encryption), but for now we
225 			 * skip encrypted blocks.
226 			 */
227 			boolean_t encrypted = B_FALSE;
228 			for (int i = 0; i < ZIO_DATA_SALT_LEN; i++) {
229 				if (drrw->drr_salt[i] != 0) {
230 					encrypted = B_TRUE;
231 					break;
232 				}
233 			}
234 			if (encrypted) {
235 				(void) sfread(buf, payload_size, stdin);
236 				break;
237 			}
238 			enum zio_compress dtype = drrw->drr_compressiontype;
239 			if (dtype >= ZIO_COMPRESS_FUNCTIONS) {
240 				fprintf(stderr, "Invalid compression type in "
241 				    "stream: %d\n", dtype);
242 				exit(3);
243 			}
244 			if (zio_compress_table[dtype].ci_decompress == NULL)
245 				dtype = ZIO_COMPRESS_OFF;
246 
247 			/* Set up buffers to minimize memcpys */
248 			char *cbuf, *dbuf;
249 			if (ctype == ZIO_COMPRESS_OFF)
250 				dbuf = buf;
251 			else
252 				dbuf = safe_calloc(bufsz);
253 
254 			if (dtype == ZIO_COMPRESS_OFF)
255 				cbuf = dbuf;
256 			else
257 				cbuf = safe_calloc(payload_size);
258 
259 			/* Read and decompress the payload */
260 			(void) sfread(cbuf, payload_size, stdin);
261 			if (dtype != ZIO_COMPRESS_OFF) {
262 				abd_t cabd, dabd;
263 				abd_get_from_buf_struct(&cabd,
264 				    cbuf, payload_size);
265 				abd_get_from_buf_struct(&dabd, dbuf,
266 				    MIN(bufsz, drrw->drr_logical_size));
267 				if (zio_decompress_data(dtype, &cabd, &dabd,
268 				    payload_size, abd_get_size(&dabd),
269 				    NULL) != 0) {
270 					warnx("decompression type %d failed "
271 					    "for ino %llu offset %llu",
272 					    dtype,
273 					    (u_longlong_t)drrw->drr_object,
274 					    (u_longlong_t)drrw->drr_offset);
275 					exit(4);
276 				}
277 				payload_size = drrw->drr_logical_size;
278 				abd_free(&dabd);
279 				abd_free(&cabd);
280 				free(cbuf);
281 			}
282 
283 			/* Recompress the payload */
284 			if (ctype != ZIO_COMPRESS_OFF) {
285 				abd_t dabd, abd;
286 				abd_get_from_buf_struct(&dabd,
287 				    dbuf, drrw->drr_logical_size);
288 				abd_t *pabd =
289 				    abd_get_from_buf_struct(&abd, buf, bufsz);
290 				size_t csize = zio_compress_data(ctype, &dabd,
291 				    &pabd, drrw->drr_logical_size,
292 				    drrw->drr_logical_size, level);
293 				size_t rounded =
294 				    P2ROUNDUP(csize, SPA_MINBLOCKSIZE);
295 				if (rounded >= drrw->drr_logical_size) {
296 					memcpy(buf, dbuf, payload_size);
297 					drrw->drr_compressiontype = 0;
298 					drrw->drr_compressed_size = 0;
299 				} else {
300 					abd_zero_off(pabd, csize,
301 					    rounded - csize);
302 					drrw->drr_compressiontype = ctype;
303 					drrw->drr_compressed_size =
304 					    payload_size = rounded;
305 				}
306 				abd_free(&abd);
307 				abd_free(&dabd);
308 				free(dbuf);
309 			} else {
310 				drrw->drr_compressiontype = 0;
311 				drrw->drr_compressed_size = 0;
312 			}
313 			break;
314 		}
315 
316 		case DRR_WRITE_EMBEDDED:
317 		{
318 			struct drr_write_embedded *drrwe =
319 			    &drr->drr_u.drr_write_embedded;
320 			VERIFY3S(begin, ==, 1);
321 			payload_size =
322 			    P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
323 			(void) sfread(buf, payload_size, stdin);
324 			break;
325 		}
326 
327 		case DRR_FREEOBJECTS:
328 		case DRR_FREE:
329 		case DRR_OBJECT_RANGE:
330 			VERIFY3S(begin, ==, 1);
331 			break;
332 
333 		default:
334 			(void) fprintf(stderr, "INVALID record type 0x%x\n",
335 			    drr->drr_type);
336 			/* should never happen, so assert */
337 			assert(B_FALSE);
338 		}
339 
340 		if (feof(stdout)) {
341 			fprintf(stderr, "Error: unexpected end-of-file\n");
342 			exit(1);
343 		}
344 		if (ferror(stdout)) {
345 			fprintf(stderr, "Error while reading file: %s\n",
346 			    strerror(errno));
347 			exit(1);
348 		}
349 
350 		/*
351 		 * We need to recalculate the checksum, and it needs to be
352 		 * initially zero to do that.  BEGIN records don't have
353 		 * a checksum.
354 		 */
355 		if (drr->drr_type != DRR_BEGIN) {
356 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
357 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
358 		}
359 		if (dump_record(drr, buf, payload_size,
360 		    &stream_cksum, STDOUT_FILENO) != 0)
361 			break;
362 		if (drr->drr_type == DRR_END) {
363 			/*
364 			 * Typically the END record is either the last
365 			 * thing in the stream, or it is followed
366 			 * by a BEGIN record (which also zeros the checksum).
367 			 * However, a stream package ends with two END
368 			 * records.  The last END record's checksum starts
369 			 * from zero.
370 			 */
371 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
372 		}
373 	}
374 	free(buf);
375 	fletcher_4_fini();
376 	zio_fini();
377 	zstd_fini();
378 	abd_fini();
379 
380 	return (0);
381 }
382