1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3 * CDDL HEADER START
4 *
5 * The contents of this file are subject to the terms of the
6 * Common Development and Distribution License (the "License").
7 * You may not use this file except in compliance with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or https://opensource.org/licenses/CDDL-1.0.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22
23 /*
24 * Copyright 2022 Axcient. All rights reserved.
25 * Use is subject to license terms.
26 *
27 * Copyright (c) 2024, Klara, Inc.
28 */
29
30 #include <err.h>
31 #include <search.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <unistd.h>
35 #include <sys/zfs_ioctl.h>
36 #include <sys/zio_checksum.h>
37 #include <sys/zstd/zstd.h>
38 #include "zfs_fletcher.h"
39 #include "zstream.h"
40
41 static int
dump_record(dmu_replay_record_t * drr,void * payload,int payload_len,zio_cksum_t * zc,int outfd)42 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
43 zio_cksum_t *zc, int outfd)
44 {
45 assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
46 == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
47 fletcher_4_incremental_native(drr,
48 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
49 if (drr->drr_type != DRR_BEGIN) {
50 assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
51 drr_checksum.drr_checksum));
52 drr->drr_u.drr_checksum.drr_checksum = *zc;
53 }
54 fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
55 sizeof (zio_cksum_t), zc);
56 if (write(outfd, drr, sizeof (*drr)) == -1)
57 return (errno);
58 if (payload_len != 0) {
59 fletcher_4_incremental_native(payload, payload_len, zc);
60 if (write(outfd, payload, payload_len) == -1)
61 return (errno);
62 }
63 return (0);
64 }
65
66 int
zstream_do_decompress(int argc,char * argv[])67 zstream_do_decompress(int argc, char *argv[])
68 {
69 const int KEYSIZE = 64;
70 int bufsz = SPA_MAXBLOCKSIZE;
71 char *buf = safe_malloc(bufsz);
72 dmu_replay_record_t thedrr;
73 dmu_replay_record_t *drr = &thedrr;
74 zio_cksum_t stream_cksum;
75 int c;
76 boolean_t verbose = B_FALSE;
77
78 while ((c = getopt(argc, argv, "v")) != -1) {
79 switch (c) {
80 case 'v':
81 verbose = B_TRUE;
82 break;
83 case '?':
84 (void) fprintf(stderr, "invalid option '%c'\n",
85 optopt);
86 zstream_usage();
87 break;
88 }
89 }
90
91 argc -= optind;
92 argv += optind;
93
94 if (argc < 0)
95 zstream_usage();
96
97 if (hcreate(argc) == 0)
98 errx(1, "hcreate");
99 for (int i = 0; i < argc; i++) {
100 uint64_t object, offset;
101 char *obj_str;
102 char *offset_str;
103 char *key;
104 char *end;
105 enum zio_compress type = ZIO_COMPRESS_LZ4;
106
107 obj_str = strsep(&argv[i], ",");
108 if (argv[i] == NULL) {
109 zstream_usage();
110 exit(2);
111 }
112 errno = 0;
113 object = strtoull(obj_str, &end, 0);
114 if (errno || *end != '\0')
115 errx(1, "invalid value for object");
116 offset_str = strsep(&argv[i], ",");
117 offset = strtoull(offset_str, &end, 0);
118 if (errno || *end != '\0')
119 errx(1, "invalid value for offset");
120 if (argv[i]) {
121 if (0 == strcmp("off", argv[i]))
122 type = ZIO_COMPRESS_OFF;
123 else if (0 == strcmp("lz4", argv[i]))
124 type = ZIO_COMPRESS_LZ4;
125 else if (0 == strcmp("lzjb", argv[i]))
126 type = ZIO_COMPRESS_LZJB;
127 else if (0 == strcmp("gzip", argv[i]))
128 type = ZIO_COMPRESS_GZIP_1;
129 else if (0 == strcmp("zle", argv[i]))
130 type = ZIO_COMPRESS_ZLE;
131 else if (0 == strcmp("zstd", argv[i]))
132 type = ZIO_COMPRESS_ZSTD;
133 else {
134 fprintf(stderr, "Invalid compression type %s.\n"
135 "Supported types are off, lz4, lzjb, gzip, "
136 "zle, and zstd\n",
137 argv[i]);
138 exit(2);
139 }
140 }
141
142 if (asprintf(&key, "%llu,%llu", (u_longlong_t)object,
143 (u_longlong_t)offset) < 0) {
144 err(1, "asprintf");
145 }
146 ENTRY e = {.key = key};
147 ENTRY *p;
148
149 p = hsearch(e, ENTER);
150 if (p == NULL)
151 errx(1, "hsearch");
152 p->data = (void*)(intptr_t)type;
153 }
154
155 if (isatty(STDIN_FILENO)) {
156 (void) fprintf(stderr,
157 "Error: The send stream is a binary format "
158 "and can not be read from a\n"
159 "terminal. Standard input must be redirected.\n");
160 exit(1);
161 }
162
163 fletcher_4_init();
164 int begin = 0;
165 boolean_t seen = B_FALSE;
166 while (sfread(drr, sizeof (*drr), stdin) != 0) {
167 struct drr_write *drrw;
168 uint64_t payload_size = 0;
169
170 /*
171 * We need to regenerate the checksum.
172 */
173 if (drr->drr_type != DRR_BEGIN) {
174 memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
175 sizeof (drr->drr_u.drr_checksum.drr_checksum));
176 }
177
178 switch (drr->drr_type) {
179 case DRR_BEGIN:
180 {
181 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
182 VERIFY0(begin++);
183 seen = B_TRUE;
184
185 uint32_t sz = drr->drr_payloadlen;
186
187 VERIFY3U(sz, <=, 1U << 28);
188
189 if (sz != 0) {
190 if (sz > bufsz) {
191 buf = realloc(buf, sz);
192 if (buf == NULL)
193 err(1, "realloc");
194 bufsz = sz;
195 }
196 (void) sfread(buf, sz, stdin);
197 }
198 payload_size = sz;
199 break;
200 }
201 case DRR_END:
202 {
203 struct drr_end *drre = &drr->drr_u.drr_end;
204 /*
205 * We would prefer to just check --begin == 0, but
206 * replication streams have an end of stream END
207 * record, so we must avoid tripping it.
208 */
209 VERIFY3B(seen, ==, B_TRUE);
210 begin--;
211 /*
212 * Use the recalculated checksum, unless this is
213 * the END record of a stream package, which has
214 * no checksum.
215 */
216 if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
217 drre->drr_checksum = stream_cksum;
218 break;
219 }
220
221 case DRR_OBJECT:
222 {
223 struct drr_object *drro = &drr->drr_u.drr_object;
224 VERIFY3S(begin, ==, 1);
225
226 if (drro->drr_bonuslen > 0) {
227 payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
228 (void) sfread(buf, payload_size, stdin);
229 }
230 break;
231 }
232
233 case DRR_SPILL:
234 {
235 struct drr_spill *drrs = &drr->drr_u.drr_spill;
236 VERIFY3S(begin, ==, 1);
237 payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
238 (void) sfread(buf, payload_size, stdin);
239 break;
240 }
241
242 case DRR_WRITE_BYREF:
243 VERIFY3S(begin, ==, 1);
244 fprintf(stderr,
245 "Deduplicated streams are not supported\n");
246 exit(1);
247 break;
248
249 case DRR_WRITE:
250 {
251 VERIFY3S(begin, ==, 1);
252 drrw = &thedrr.drr_u.drr_write;
253 payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
254 ENTRY *p;
255 char key[KEYSIZE];
256
257 snprintf(key, KEYSIZE, "%llu,%llu",
258 (u_longlong_t)drrw->drr_object,
259 (u_longlong_t)drrw->drr_offset);
260 ENTRY e = {.key = key};
261
262 p = hsearch(e, FIND);
263 if (p == NULL) {
264 /*
265 * Read the contents of the block unaltered
266 */
267 (void) sfread(buf, payload_size, stdin);
268 break;
269 }
270
271 /*
272 * Read and decompress the block
273 */
274 enum zio_compress c =
275 (enum zio_compress)(intptr_t)p->data;
276
277 if (c == ZIO_COMPRESS_OFF) {
278 (void) sfread(buf, payload_size, stdin);
279 drrw->drr_compressiontype = 0;
280 drrw->drr_compressed_size = 0;
281 if (verbose)
282 fprintf(stderr,
283 "Resetting compression type to "
284 "off for ino %llu offset %llu\n",
285 (u_longlong_t)drrw->drr_object,
286 (u_longlong_t)drrw->drr_offset);
287 break;
288 }
289
290 uint64_t lsize = drrw->drr_logical_size;
291 ASSERT3U(payload_size, <=, lsize);
292
293 char *lzbuf = safe_calloc(payload_size);
294 (void) sfread(lzbuf, payload_size, stdin);
295
296 abd_t sabd, dabd;
297 abd_get_from_buf_struct(&sabd, lzbuf, payload_size);
298 abd_get_from_buf_struct(&dabd, buf, lsize);
299 int err = zio_decompress_data(c, &sabd, &dabd,
300 payload_size, lsize, NULL);
301 abd_free(&dabd);
302 abd_free(&sabd);
303
304 if (err == 0) {
305 drrw->drr_compressiontype = 0;
306 drrw->drr_compressed_size = 0;
307 payload_size = lsize;
308 if (verbose) {
309 fprintf(stderr,
310 "successfully decompressed "
311 "ino %llu offset %llu\n",
312 (u_longlong_t)drrw->drr_object,
313 (u_longlong_t)drrw->drr_offset);
314 }
315 } else {
316 /*
317 * The block must not be compressed, at least
318 * not with this compression type, possibly
319 * because it gets written multiple times in
320 * this stream.
321 */
322 warnx("decompression failed for "
323 "ino %llu offset %llu",
324 (u_longlong_t)drrw->drr_object,
325 (u_longlong_t)drrw->drr_offset);
326 memcpy(buf, lzbuf, payload_size);
327 }
328
329 free(lzbuf);
330 break;
331 }
332
333 case DRR_WRITE_EMBEDDED:
334 {
335 VERIFY3S(begin, ==, 1);
336 struct drr_write_embedded *drrwe =
337 &drr->drr_u.drr_write_embedded;
338 payload_size =
339 P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
340 (void) sfread(buf, payload_size, stdin);
341 break;
342 }
343
344 case DRR_FREEOBJECTS:
345 case DRR_FREE:
346 case DRR_OBJECT_RANGE:
347 VERIFY3S(begin, ==, 1);
348 break;
349
350 default:
351 (void) fprintf(stderr, "INVALID record type 0x%x\n",
352 drr->drr_type);
353 /* should never happen, so assert */
354 assert(B_FALSE);
355 }
356
357 if (feof(stdout)) {
358 fprintf(stderr, "Error: unexpected end-of-file\n");
359 exit(1);
360 }
361 if (ferror(stdout)) {
362 fprintf(stderr, "Error while reading file: %s\n",
363 strerror(errno));
364 exit(1);
365 }
366
367 /*
368 * We need to recalculate the checksum, and it needs to be
369 * initially zero to do that. BEGIN records don't have
370 * a checksum.
371 */
372 if (drr->drr_type != DRR_BEGIN) {
373 memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
374 sizeof (drr->drr_u.drr_checksum.drr_checksum));
375 }
376 if (dump_record(drr, buf, payload_size,
377 &stream_cksum, STDOUT_FILENO) != 0)
378 break;
379 if (drr->drr_type == DRR_END) {
380 /*
381 * Typically the END record is either the last
382 * thing in the stream, or it is followed
383 * by a BEGIN record (which also zeros the checksum).
384 * However, a stream package ends with two END
385 * records. The last END record's checksum starts
386 * from zero.
387 */
388 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
389 }
390 }
391 free(buf);
392 fletcher_4_fini();
393 hdestroy();
394
395 return (0);
396 }
397