xref: /freebsd/sys/contrib/openzfs/cmd/zstream/zstream_decompress.c (revision e2df9bb44109577475aeb186e7186ac040f9bde1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or https://opensource.org/licenses/CDDL-1.0.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2022 Axcient.  All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2024, Klara, Inc.
27  */
28 
29 #include <err.h>
30 #include <search.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <unistd.h>
34 #include <sys/zfs_ioctl.h>
35 #include <sys/zio_checksum.h>
36 #include <sys/zstd/zstd.h>
37 #include "zfs_fletcher.h"
38 #include "zstream.h"
39 
40 static int
dump_record(dmu_replay_record_t * drr,void * payload,int payload_len,zio_cksum_t * zc,int outfd)41 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
42     zio_cksum_t *zc, int outfd)
43 {
44 	assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
45 	    == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
46 	fletcher_4_incremental_native(drr,
47 	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
48 	if (drr->drr_type != DRR_BEGIN) {
49 		assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
50 		    drr_checksum.drr_checksum));
51 		drr->drr_u.drr_checksum.drr_checksum = *zc;
52 	}
53 	fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
54 	    sizeof (zio_cksum_t), zc);
55 	if (write(outfd, drr, sizeof (*drr)) == -1)
56 		return (errno);
57 	if (payload_len != 0) {
58 		fletcher_4_incremental_native(payload, payload_len, zc);
59 		if (write(outfd, payload, payload_len) == -1)
60 			return (errno);
61 	}
62 	return (0);
63 }
64 
65 int
zstream_do_decompress(int argc,char * argv[])66 zstream_do_decompress(int argc, char *argv[])
67 {
68 	const int KEYSIZE = 64;
69 	int bufsz = SPA_MAXBLOCKSIZE;
70 	char *buf = safe_malloc(bufsz);
71 	dmu_replay_record_t thedrr;
72 	dmu_replay_record_t *drr = &thedrr;
73 	zio_cksum_t stream_cksum;
74 	int c;
75 	boolean_t verbose = B_FALSE;
76 
77 	while ((c = getopt(argc, argv, "v")) != -1) {
78 		switch (c) {
79 		case 'v':
80 			verbose = B_TRUE;
81 			break;
82 		case '?':
83 			(void) fprintf(stderr, "invalid option '%c'\n",
84 			    optopt);
85 			zstream_usage();
86 			break;
87 		}
88 	}
89 
90 	argc -= optind;
91 	argv += optind;
92 
93 	if (argc < 0)
94 		zstream_usage();
95 
96 	if (hcreate(argc) == 0)
97 		errx(1, "hcreate");
98 	for (int i = 0; i < argc; i++) {
99 		uint64_t object, offset;
100 		char *obj_str;
101 		char *offset_str;
102 		char *key;
103 		char *end;
104 		enum zio_compress type = ZIO_COMPRESS_LZ4;
105 
106 		obj_str = strsep(&argv[i], ",");
107 		if (argv[i] == NULL) {
108 			zstream_usage();
109 			exit(2);
110 		}
111 		errno = 0;
112 		object = strtoull(obj_str, &end, 0);
113 		if (errno || *end != '\0')
114 			errx(1, "invalid value for object");
115 		offset_str = strsep(&argv[i], ",");
116 		offset = strtoull(offset_str, &end, 0);
117 		if (errno || *end != '\0')
118 			errx(1, "invalid value for offset");
119 		if (argv[i]) {
120 			if (0 == strcmp("off", argv[i]))
121 				type = ZIO_COMPRESS_OFF;
122 			else if (0 == strcmp("lz4", argv[i]))
123 				type = ZIO_COMPRESS_LZ4;
124 			else if (0 == strcmp("lzjb", argv[i]))
125 				type = ZIO_COMPRESS_LZJB;
126 			else if (0 == strcmp("gzip", argv[i]))
127 				type = ZIO_COMPRESS_GZIP_1;
128 			else if (0 == strcmp("zle", argv[i]))
129 				type = ZIO_COMPRESS_ZLE;
130 			else if (0 == strcmp("zstd", argv[i]))
131 				type = ZIO_COMPRESS_ZSTD;
132 			else {
133 				fprintf(stderr, "Invalid compression type %s.\n"
134 				    "Supported types are off, lz4, lzjb, gzip, "
135 				    "zle, and zstd\n",
136 				    argv[i]);
137 				exit(2);
138 			}
139 		}
140 
141 		if (asprintf(&key, "%llu,%llu", (u_longlong_t)object,
142 		    (u_longlong_t)offset) < 0) {
143 			err(1, "asprintf");
144 		}
145 		ENTRY e = {.key = key};
146 		ENTRY *p;
147 
148 		p = hsearch(e, ENTER);
149 		if (p == NULL)
150 			errx(1, "hsearch");
151 		p->data = (void*)(intptr_t)type;
152 	}
153 
154 	if (isatty(STDIN_FILENO)) {
155 		(void) fprintf(stderr,
156 		    "Error: The send stream is a binary format "
157 		    "and can not be read from a\n"
158 		    "terminal.  Standard input must be redirected.\n");
159 		exit(1);
160 	}
161 
162 	fletcher_4_init();
163 	int begin = 0;
164 	boolean_t seen = B_FALSE;
165 	while (sfread(drr, sizeof (*drr), stdin) != 0) {
166 		struct drr_write *drrw;
167 		uint64_t payload_size = 0;
168 
169 		/*
170 		 * We need to regenerate the checksum.
171 		 */
172 		if (drr->drr_type != DRR_BEGIN) {
173 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
174 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
175 		}
176 
177 		switch (drr->drr_type) {
178 		case DRR_BEGIN:
179 		{
180 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
181 			VERIFY0(begin++);
182 			seen = B_TRUE;
183 
184 			uint32_t sz = drr->drr_payloadlen;
185 
186 			VERIFY3U(sz, <=, 1U << 28);
187 
188 			if (sz != 0) {
189 				if (sz > bufsz) {
190 					buf = realloc(buf, sz);
191 					if (buf == NULL)
192 						err(1, "realloc");
193 					bufsz = sz;
194 				}
195 				(void) sfread(buf, sz, stdin);
196 			}
197 			payload_size = sz;
198 			break;
199 		}
200 		case DRR_END:
201 		{
202 			struct drr_end *drre = &drr->drr_u.drr_end;
203 			/*
204 			 * We would prefer to just check --begin == 0, but
205 			 * replication streams have an end of stream END
206 			 * record, so we must avoid tripping it.
207 			 */
208 			VERIFY3B(seen, ==, B_TRUE);
209 			begin--;
210 			/*
211 			 * Use the recalculated checksum, unless this is
212 			 * the END record of a stream package, which has
213 			 * no checksum.
214 			 */
215 			if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
216 				drre->drr_checksum = stream_cksum;
217 			break;
218 		}
219 
220 		case DRR_OBJECT:
221 		{
222 			struct drr_object *drro = &drr->drr_u.drr_object;
223 			VERIFY3S(begin, ==, 1);
224 
225 			if (drro->drr_bonuslen > 0) {
226 				payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
227 				(void) sfread(buf, payload_size, stdin);
228 			}
229 			break;
230 		}
231 
232 		case DRR_SPILL:
233 		{
234 			struct drr_spill *drrs = &drr->drr_u.drr_spill;
235 			VERIFY3S(begin, ==, 1);
236 			payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
237 			(void) sfread(buf, payload_size, stdin);
238 			break;
239 		}
240 
241 		case DRR_WRITE_BYREF:
242 			VERIFY3S(begin, ==, 1);
243 			fprintf(stderr,
244 			    "Deduplicated streams are not supported\n");
245 			exit(1);
246 			break;
247 
248 		case DRR_WRITE:
249 		{
250 			VERIFY3S(begin, ==, 1);
251 			drrw = &thedrr.drr_u.drr_write;
252 			payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
253 			ENTRY *p;
254 			char key[KEYSIZE];
255 
256 			snprintf(key, KEYSIZE, "%llu,%llu",
257 			    (u_longlong_t)drrw->drr_object,
258 			    (u_longlong_t)drrw->drr_offset);
259 			ENTRY e = {.key = key};
260 
261 			p = hsearch(e, FIND);
262 			if (p == NULL) {
263 				/*
264 				 * Read the contents of the block unaltered
265 				 */
266 				(void) sfread(buf, payload_size, stdin);
267 				break;
268 			}
269 
270 			/*
271 			 * Read and decompress the block
272 			 */
273 			enum zio_compress c =
274 			    (enum zio_compress)(intptr_t)p->data;
275 
276 			if (c == ZIO_COMPRESS_OFF) {
277 				(void) sfread(buf, payload_size, stdin);
278 				drrw->drr_compressiontype = 0;
279 				drrw->drr_compressed_size = 0;
280 				if (verbose)
281 					fprintf(stderr,
282 					    "Resetting compression type to "
283 					    "off for ino %llu offset %llu\n",
284 					    (u_longlong_t)drrw->drr_object,
285 					    (u_longlong_t)drrw->drr_offset);
286 				break;
287 			}
288 
289 			uint64_t lsize = drrw->drr_logical_size;
290 			ASSERT3U(payload_size, <=, lsize);
291 
292 			char *lzbuf = safe_calloc(payload_size);
293 			(void) sfread(lzbuf, payload_size, stdin);
294 
295 			abd_t sabd, dabd;
296 			abd_get_from_buf_struct(&sabd, lzbuf, payload_size);
297 			abd_get_from_buf_struct(&dabd, buf, lsize);
298 			int err = zio_decompress_data(c, &sabd, &dabd,
299 			    payload_size, lsize, NULL);
300 			abd_free(&dabd);
301 			abd_free(&sabd);
302 
303 			if (err == 0) {
304 				drrw->drr_compressiontype = 0;
305 				drrw->drr_compressed_size = 0;
306 				payload_size = lsize;
307 				if (verbose) {
308 					fprintf(stderr,
309 					    "successfully decompressed "
310 					    "ino %llu offset %llu\n",
311 					    (u_longlong_t)drrw->drr_object,
312 					    (u_longlong_t)drrw->drr_offset);
313 				}
314 			} else {
315 				/*
316 				 * The block must not be compressed, at least
317 				 * not with this compression type, possibly
318 				 * because it gets written multiple times in
319 				 * this stream.
320 				 */
321 				warnx("decompression failed for "
322 				    "ino %llu offset %llu",
323 				    (u_longlong_t)drrw->drr_object,
324 				    (u_longlong_t)drrw->drr_offset);
325 				memcpy(buf, lzbuf, payload_size);
326 			}
327 
328 			free(lzbuf);
329 			break;
330 		}
331 
332 		case DRR_WRITE_EMBEDDED:
333 		{
334 			VERIFY3S(begin, ==, 1);
335 			struct drr_write_embedded *drrwe =
336 			    &drr->drr_u.drr_write_embedded;
337 			payload_size =
338 			    P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
339 			(void) sfread(buf, payload_size, stdin);
340 			break;
341 		}
342 
343 		case DRR_FREEOBJECTS:
344 		case DRR_FREE:
345 		case DRR_OBJECT_RANGE:
346 			VERIFY3S(begin, ==, 1);
347 			break;
348 
349 		default:
350 			(void) fprintf(stderr, "INVALID record type 0x%x\n",
351 			    drr->drr_type);
352 			/* should never happen, so assert */
353 			assert(B_FALSE);
354 		}
355 
356 		if (feof(stdout)) {
357 			fprintf(stderr, "Error: unexpected end-of-file\n");
358 			exit(1);
359 		}
360 		if (ferror(stdout)) {
361 			fprintf(stderr, "Error while reading file: %s\n",
362 			    strerror(errno));
363 			exit(1);
364 		}
365 
366 		/*
367 		 * We need to recalculate the checksum, and it needs to be
368 		 * initially zero to do that.  BEGIN records don't have
369 		 * a checksum.
370 		 */
371 		if (drr->drr_type != DRR_BEGIN) {
372 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
373 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
374 		}
375 		if (dump_record(drr, buf, payload_size,
376 		    &stream_cksum, STDOUT_FILENO) != 0)
377 			break;
378 		if (drr->drr_type == DRR_END) {
379 			/*
380 			 * Typically the END record is either the last
381 			 * thing in the stream, or it is followed
382 			 * by a BEGIN record (which also zeros the checksum).
383 			 * However, a stream package ends with two END
384 			 * records.  The last END record's checksum starts
385 			 * from zero.
386 			 */
387 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
388 		}
389 	}
390 	free(buf);
391 	fletcher_4_fini();
392 	hdestroy();
393 
394 	return (0);
395 }
396