1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3 * CDDL HEADER START
4 *
5 * The contents of this file are subject to the terms of the
6 * Common Development and Distribution License (the "License").
7 * You may not use this file except in compliance with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or https://opensource.org/licenses/CDDL-1.0.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22
23 /*
24 * Copyright 2022 Axcient. All rights reserved.
25 * Use is subject to license terms.
26 *
27 * Copyright (c) 2022 by Delphix. All rights reserved.
28 * Copyright (c) 2024, Klara, Inc.
29 */
30
31 #include <err.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <unistd.h>
35 #include <sys/zfs_ioctl.h>
36 #include <sys/zio_checksum.h>
37 #include <sys/zstd/zstd.h>
38 #include "zfs_fletcher.h"
39 #include "zstream.h"
40
41 static int
dump_record(dmu_replay_record_t * drr,void * payload,int payload_len,zio_cksum_t * zc,int outfd)42 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
43 zio_cksum_t *zc, int outfd)
44 {
45 assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
46 == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
47 fletcher_4_incremental_native(drr,
48 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
49 if (drr->drr_type != DRR_BEGIN) {
50 assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
51 drr_checksum.drr_checksum));
52 drr->drr_u.drr_checksum.drr_checksum = *zc;
53 }
54 fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
55 sizeof (zio_cksum_t), zc);
56 if (write(outfd, drr, sizeof (*drr)) == -1)
57 return (errno);
58 if (payload_len != 0) {
59 fletcher_4_incremental_native(payload, payload_len, zc);
60 if (write(outfd, payload, payload_len) == -1)
61 return (errno);
62 }
63 return (0);
64 }
65
66 int
zstream_do_recompress(int argc,char * argv[])67 zstream_do_recompress(int argc, char *argv[])
68 {
69 int bufsz = SPA_MAXBLOCKSIZE;
70 char *buf = safe_malloc(bufsz);
71 dmu_replay_record_t thedrr;
72 dmu_replay_record_t *drr = &thedrr;
73 zio_cksum_t stream_cksum;
74 int c;
75 int level = 0;
76
77 while ((c = getopt(argc, argv, "l:")) != -1) {
78 switch (c) {
79 case 'l':
80 if (sscanf(optarg, "%d", &level) != 1) {
81 fprintf(stderr,
82 "failed to parse level '%s'\n",
83 optarg);
84 zstream_usage();
85 }
86 break;
87 case '?':
88 (void) fprintf(stderr, "invalid option '%c'\n",
89 optopt);
90 zstream_usage();
91 break;
92 }
93 }
94
95 argc -= optind;
96 argv += optind;
97
98 if (argc != 1)
99 zstream_usage();
100
101 enum zio_compress ctype;
102 if (strcmp(argv[0], "off") == 0) {
103 ctype = ZIO_COMPRESS_OFF;
104 } else {
105 for (ctype = 0; ctype < ZIO_COMPRESS_FUNCTIONS; ctype++) {
106 if (strcmp(argv[0],
107 zio_compress_table[ctype].ci_name) == 0)
108 break;
109 }
110 if (ctype == ZIO_COMPRESS_FUNCTIONS ||
111 zio_compress_table[ctype].ci_compress == NULL) {
112 fprintf(stderr, "Invalid compression type %s.\n",
113 argv[0]);
114 exit(2);
115 }
116 }
117
118 if (isatty(STDIN_FILENO)) {
119 (void) fprintf(stderr,
120 "Error: The send stream is a binary format "
121 "and can not be read from a\n"
122 "terminal. Standard input must be redirected.\n");
123 exit(1);
124 }
125
126 abd_init();
127 fletcher_4_init();
128 zio_init();
129 zstd_init();
130 int begin = 0;
131 boolean_t seen = B_FALSE;
132 while (sfread(drr, sizeof (*drr), stdin) != 0) {
133 struct drr_write *drrw;
134 uint64_t payload_size = 0;
135
136 /*
137 * We need to regenerate the checksum.
138 */
139 if (drr->drr_type != DRR_BEGIN) {
140 memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
141 sizeof (drr->drr_u.drr_checksum.drr_checksum));
142 }
143
144
145 switch (drr->drr_type) {
146 case DRR_BEGIN:
147 {
148 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
149 VERIFY0(begin++);
150 seen = B_TRUE;
151
152 uint32_t sz = drr->drr_payloadlen;
153
154 VERIFY3U(sz, <=, 1U << 28);
155
156 if (sz != 0) {
157 if (sz > bufsz) {
158 buf = realloc(buf, sz);
159 if (buf == NULL)
160 err(1, "realloc");
161 bufsz = sz;
162 }
163 (void) sfread(buf, sz, stdin);
164 }
165 payload_size = sz;
166 break;
167 }
168 case DRR_END:
169 {
170 struct drr_end *drre = &drr->drr_u.drr_end;
171 /*
172 * We would prefer to just check --begin == 0, but
173 * replication streams have an end of stream END
174 * record, so we must avoid tripping it.
175 */
176 VERIFY3B(seen, ==, B_TRUE);
177 begin--;
178 /*
179 * Use the recalculated checksum, unless this is
180 * the END record of a stream package, which has
181 * no checksum.
182 */
183 if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
184 drre->drr_checksum = stream_cksum;
185 break;
186 }
187
188 case DRR_OBJECT:
189 {
190 struct drr_object *drro = &drr->drr_u.drr_object;
191 VERIFY3S(begin, ==, 1);
192
193 if (drro->drr_bonuslen > 0) {
194 payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
195 (void) sfread(buf, payload_size, stdin);
196 }
197 break;
198 }
199
200 case DRR_SPILL:
201 {
202 struct drr_spill *drrs = &drr->drr_u.drr_spill;
203 VERIFY3S(begin, ==, 1);
204 payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
205 (void) sfread(buf, payload_size, stdin);
206 break;
207 }
208
209 case DRR_WRITE_BYREF:
210 VERIFY3S(begin, ==, 1);
211 fprintf(stderr,
212 "Deduplicated streams are not supported\n");
213 exit(1);
214 break;
215
216 case DRR_WRITE:
217 {
218 VERIFY3S(begin, ==, 1);
219 drrw = &thedrr.drr_u.drr_write;
220 payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
221 /*
222 * In order to recompress an encrypted block, you have
223 * to decrypt, decompress, recompress, and
224 * re-encrypt. That can be a future enhancement (along
225 * with decryption or re-encryption), but for now we
226 * skip encrypted blocks.
227 */
228 boolean_t encrypted = B_FALSE;
229 for (int i = 0; i < ZIO_DATA_SALT_LEN; i++) {
230 if (drrw->drr_salt[i] != 0) {
231 encrypted = B_TRUE;
232 break;
233 }
234 }
235 if (encrypted) {
236 (void) sfread(buf, payload_size, stdin);
237 break;
238 }
239 enum zio_compress dtype = drrw->drr_compressiontype;
240 if (dtype >= ZIO_COMPRESS_FUNCTIONS) {
241 fprintf(stderr, "Invalid compression type in "
242 "stream: %d\n", dtype);
243 exit(3);
244 }
245 if (zio_compress_table[dtype].ci_decompress == NULL)
246 dtype = ZIO_COMPRESS_OFF;
247
248 /* Set up buffers to minimize memcpys */
249 char *cbuf, *dbuf;
250 if (ctype == ZIO_COMPRESS_OFF)
251 dbuf = buf;
252 else
253 dbuf = safe_calloc(bufsz);
254
255 if (dtype == ZIO_COMPRESS_OFF)
256 cbuf = dbuf;
257 else
258 cbuf = safe_calloc(payload_size);
259
260 /* Read and decompress the payload */
261 (void) sfread(cbuf, payload_size, stdin);
262 if (dtype != ZIO_COMPRESS_OFF) {
263 abd_t cabd, dabd;
264 abd_get_from_buf_struct(&cabd,
265 cbuf, payload_size);
266 abd_get_from_buf_struct(&dabd, dbuf,
267 MIN(bufsz, drrw->drr_logical_size));
268 if (zio_decompress_data(dtype, &cabd, &dabd,
269 payload_size, abd_get_size(&dabd),
270 NULL) != 0) {
271 warnx("decompression type %d failed "
272 "for ino %llu offset %llu",
273 dtype,
274 (u_longlong_t)drrw->drr_object,
275 (u_longlong_t)drrw->drr_offset);
276 exit(4);
277 }
278 payload_size = drrw->drr_logical_size;
279 abd_free(&dabd);
280 abd_free(&cabd);
281 free(cbuf);
282 }
283
284 /* Recompress the payload */
285 if (ctype != ZIO_COMPRESS_OFF) {
286 abd_t dabd, abd;
287 abd_get_from_buf_struct(&dabd,
288 dbuf, drrw->drr_logical_size);
289 abd_t *pabd =
290 abd_get_from_buf_struct(&abd, buf, bufsz);
291 size_t csize = zio_compress_data(ctype, &dabd,
292 &pabd, drrw->drr_logical_size,
293 drrw->drr_logical_size, level);
294 size_t rounded =
295 P2ROUNDUP(csize, SPA_MINBLOCKSIZE);
296 if (rounded >= drrw->drr_logical_size) {
297 memcpy(buf, dbuf, payload_size);
298 drrw->drr_compressiontype = 0;
299 drrw->drr_compressed_size = 0;
300 } else {
301 abd_zero_off(pabd, csize,
302 rounded - csize);
303 drrw->drr_compressiontype = ctype;
304 drrw->drr_compressed_size =
305 payload_size = rounded;
306 }
307 abd_free(&abd);
308 abd_free(&dabd);
309 free(dbuf);
310 } else {
311 drrw->drr_compressiontype = 0;
312 drrw->drr_compressed_size = 0;
313 }
314 break;
315 }
316
317 case DRR_WRITE_EMBEDDED:
318 {
319 struct drr_write_embedded *drrwe =
320 &drr->drr_u.drr_write_embedded;
321 VERIFY3S(begin, ==, 1);
322 payload_size =
323 P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
324 (void) sfread(buf, payload_size, stdin);
325 break;
326 }
327
328 case DRR_FREEOBJECTS:
329 case DRR_FREE:
330 case DRR_OBJECT_RANGE:
331 VERIFY3S(begin, ==, 1);
332 break;
333
334 default:
335 (void) fprintf(stderr, "INVALID record type 0x%x\n",
336 drr->drr_type);
337 /* should never happen, so assert */
338 assert(B_FALSE);
339 }
340
341 if (feof(stdout)) {
342 fprintf(stderr, "Error: unexpected end-of-file\n");
343 exit(1);
344 }
345 if (ferror(stdout)) {
346 fprintf(stderr, "Error while reading file: %s\n",
347 strerror(errno));
348 exit(1);
349 }
350
351 /*
352 * We need to recalculate the checksum, and it needs to be
353 * initially zero to do that. BEGIN records don't have
354 * a checksum.
355 */
356 if (drr->drr_type != DRR_BEGIN) {
357 memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
358 sizeof (drr->drr_u.drr_checksum.drr_checksum));
359 }
360 if (dump_record(drr, buf, payload_size,
361 &stream_cksum, STDOUT_FILENO) != 0)
362 break;
363 if (drr->drr_type == DRR_END) {
364 /*
365 * Typically the END record is either the last
366 * thing in the stream, or it is followed
367 * by a BEGIN record (which also zeros the checksum).
368 * However, a stream package ends with two END
369 * records. The last END record's checksum starts
370 * from zero.
371 */
372 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
373 }
374 }
375 free(buf);
376 fletcher_4_fini();
377 zio_fini();
378 zstd_fini();
379 abd_fini();
380
381 return (0);
382 }
383