xref: /freebsd/sys/contrib/openzfs/cmd/zstream/zstream_recompress.c (revision a1f8a0c793c67ab5854035e017f34d3d016b6d0d)
1  /*
2   * CDDL HEADER START
3   *
4   * The contents of this file are subject to the terms of the
5   * Common Development and Distribution License (the "License").
6   * You may not use this file except in compliance with the License.
7   *
8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9   * or https://opensource.org/licenses/CDDL-1.0.
10   * See the License for the specific language governing permissions
11   * and limitations under the License.
12   *
13   * When distributing Covered Code, include this CDDL HEADER in each
14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15   * If applicable, add the following below this CDDL HEADER, with the
16   * fields enclosed by brackets "[]" replaced with your own identifying
17   * information: Portions Copyright [yyyy] [name of copyright owner]
18   *
19   * CDDL HEADER END
20   */
21  
22  /*
23   * Copyright 2022 Axcient.  All rights reserved.
24   * Use is subject to license terms.
25   */
26  
27  /*
28   * Copyright (c) 2022 by Delphix. All rights reserved.
29   */
30  
31  #include <err.h>
32  #include <stdio.h>
33  #include <stdlib.h>
34  #include <unistd.h>
35  #include <sys/zfs_ioctl.h>
36  #include <sys/zio_checksum.h>
37  #include <sys/zstd/zstd.h>
38  #include "zfs_fletcher.h"
39  #include "zstream.h"
40  
41  static int
42  dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
43      zio_cksum_t *zc, int outfd)
44  {
45  	assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
46  	    == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
47  	fletcher_4_incremental_native(drr,
48  	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
49  	if (drr->drr_type != DRR_BEGIN) {
50  		assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
51  		    drr_checksum.drr_checksum));
52  		drr->drr_u.drr_checksum.drr_checksum = *zc;
53  	}
54  	fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
55  	    sizeof (zio_cksum_t), zc);
56  	if (write(outfd, drr, sizeof (*drr)) == -1)
57  		return (errno);
58  	if (payload_len != 0) {
59  		fletcher_4_incremental_native(payload, payload_len, zc);
60  		if (write(outfd, payload, payload_len) == -1)
61  			return (errno);
62  	}
63  	return (0);
64  }
65  
66  int
67  zstream_do_recompress(int argc, char *argv[])
68  {
69  	int bufsz = SPA_MAXBLOCKSIZE;
70  	char *buf = safe_malloc(bufsz);
71  	dmu_replay_record_t thedrr;
72  	dmu_replay_record_t *drr = &thedrr;
73  	zio_cksum_t stream_cksum;
74  	int c;
75  	int level = -1;
76  
77  	while ((c = getopt(argc, argv, "l:")) != -1) {
78  		switch (c) {
79  		case 'l':
80  			if (sscanf(optarg, "%d", &level) != 0) {
81  				fprintf(stderr,
82  				    "failed to parse level '%s'\n",
83  				    optarg);
84  				zstream_usage();
85  			}
86  			break;
87  		case '?':
88  			(void) fprintf(stderr, "invalid option '%c'\n",
89  			    optopt);
90  			zstream_usage();
91  			break;
92  		}
93  	}
94  
95  	argc -= optind;
96  	argv += optind;
97  
98  	if (argc != 1)
99  		zstream_usage();
100  	int type = 0;
101  	zio_compress_info_t *cinfo = NULL;
102  	if (0 == strcmp(argv[0], "off")) {
103  		type = ZIO_COMPRESS_OFF;
104  		cinfo = &zio_compress_table[type];
105  	} else if (0 == strcmp(argv[0], "inherit") ||
106  	    0 == strcmp(argv[0], "empty") ||
107  	    0 == strcmp(argv[0], "on")) {
108  		// Fall through to invalid compression type case
109  	} else {
110  		for (int i = 0; i < ZIO_COMPRESS_FUNCTIONS; i++) {
111  			if (0 == strcmp(zio_compress_table[i].ci_name,
112  			    argv[0])) {
113  				cinfo = &zio_compress_table[i];
114  				type = i;
115  				break;
116  			}
117  		}
118  	}
119  	if (cinfo == NULL) {
120  		fprintf(stderr, "Invalid compression type %s.\n",
121  		    argv[0]);
122  		exit(2);
123  	}
124  
125  	if (cinfo->ci_compress == NULL) {
126  		type = 0;
127  		cinfo = &zio_compress_table[0];
128  	}
129  
130  	if (isatty(STDIN_FILENO)) {
131  		(void) fprintf(stderr,
132  		    "Error: The send stream is a binary format "
133  		    "and can not be read from a\n"
134  		    "terminal.  Standard input must be redirected.\n");
135  		exit(1);
136  	}
137  
138  	fletcher_4_init();
139  	zio_init();
140  	zstd_init();
141  	int begin = 0;
142  	boolean_t seen = B_FALSE;
143  	while (sfread(drr, sizeof (*drr), stdin) != 0) {
144  		struct drr_write *drrw;
145  		uint64_t payload_size = 0;
146  
147  		/*
148  		 * We need to regenerate the checksum.
149  		 */
150  		if (drr->drr_type != DRR_BEGIN) {
151  			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
152  			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
153  		}
154  
155  
156  		switch (drr->drr_type) {
157  		case DRR_BEGIN:
158  		{
159  			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
160  			VERIFY0(begin++);
161  			seen = B_TRUE;
162  
163  			uint32_t sz = drr->drr_payloadlen;
164  
165  			VERIFY3U(sz, <=, 1U << 28);
166  
167  			if (sz != 0) {
168  				if (sz > bufsz) {
169  					buf = realloc(buf, sz);
170  					if (buf == NULL)
171  						err(1, "realloc");
172  					bufsz = sz;
173  				}
174  				(void) sfread(buf, sz, stdin);
175  			}
176  			payload_size = sz;
177  			break;
178  		}
179  		case DRR_END:
180  		{
181  			struct drr_end *drre = &drr->drr_u.drr_end;
182  			/*
183  			 * We would prefer to just check --begin == 0, but
184  			 * replication streams have an end of stream END
185  			 * record, so we must avoid tripping it.
186  			 */
187  			VERIFY3B(seen, ==, B_TRUE);
188  			begin--;
189  			/*
190  			 * Use the recalculated checksum, unless this is
191  			 * the END record of a stream package, which has
192  			 * no checksum.
193  			 */
194  			if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
195  				drre->drr_checksum = stream_cksum;
196  			break;
197  		}
198  
199  		case DRR_OBJECT:
200  		{
201  			struct drr_object *drro = &drr->drr_u.drr_object;
202  			VERIFY3S(begin, ==, 1);
203  
204  			if (drro->drr_bonuslen > 0) {
205  				payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
206  				(void) sfread(buf, payload_size, stdin);
207  			}
208  			break;
209  		}
210  
211  		case DRR_SPILL:
212  		{
213  			struct drr_spill *drrs = &drr->drr_u.drr_spill;
214  			VERIFY3S(begin, ==, 1);
215  			payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
216  			(void) sfread(buf, payload_size, stdin);
217  			break;
218  		}
219  
220  		case DRR_WRITE_BYREF:
221  			VERIFY3S(begin, ==, 1);
222  			fprintf(stderr,
223  			    "Deduplicated streams are not supported\n");
224  			exit(1);
225  			break;
226  
227  		case DRR_WRITE:
228  		{
229  			VERIFY3S(begin, ==, 1);
230  			drrw = &thedrr.drr_u.drr_write;
231  			payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
232  			/*
233  			 * In order to recompress an encrypted block, you have
234  			 * to decrypt, decompress, recompress, and
235  			 * re-encrypt. That can be a future enhancement (along
236  			 * with decryption or re-encryption), but for now we
237  			 * skip encrypted blocks.
238  			 */
239  			boolean_t encrypted = B_FALSE;
240  			for (int i = 0; i < ZIO_DATA_SALT_LEN; i++) {
241  				if (drrw->drr_salt[i] != 0) {
242  					encrypted = B_TRUE;
243  					break;
244  				}
245  			}
246  			if (encrypted) {
247  				(void) sfread(buf, payload_size, stdin);
248  				break;
249  			}
250  			if (drrw->drr_compressiontype >=
251  			    ZIO_COMPRESS_FUNCTIONS) {
252  				fprintf(stderr, "Invalid compression type in "
253  				    "stream: %d\n", drrw->drr_compressiontype);
254  				exit(3);
255  			}
256  			zio_compress_info_t *dinfo =
257  			    &zio_compress_table[drrw->drr_compressiontype];
258  
259  			/* Set up buffers to minimize memcpys */
260  			char *cbuf, *dbuf;
261  			if (cinfo->ci_compress == NULL)
262  				dbuf = buf;
263  			else
264  				dbuf = safe_calloc(bufsz);
265  
266  			if (dinfo->ci_decompress == NULL)
267  				cbuf = dbuf;
268  			else
269  				cbuf = safe_calloc(payload_size);
270  
271  			/* Read and decompress the payload */
272  			(void) sfread(cbuf, payload_size, stdin);
273  			if (dinfo->ci_decompress != NULL) {
274  				if (0 != dinfo->ci_decompress(cbuf, dbuf,
275  				    payload_size, MIN(bufsz,
276  				    drrw->drr_logical_size), dinfo->ci_level)) {
277  					warnx("decompression type %d failed "
278  					    "for ino %llu offset %llu",
279  					    type,
280  					    (u_longlong_t)drrw->drr_object,
281  					    (u_longlong_t)drrw->drr_offset);
282  					exit(4);
283  				}
284  				payload_size = drrw->drr_logical_size;
285  				free(cbuf);
286  			}
287  
288  			/* Recompress the payload */
289  			if (cinfo->ci_compress != NULL) {
290  				payload_size = P2ROUNDUP(cinfo->ci_compress(
291  				    dbuf, buf, drrw->drr_logical_size,
292  				    MIN(payload_size, bufsz), (level == -1 ?
293  				    cinfo->ci_level : level)),
294  				    SPA_MINBLOCKSIZE);
295  				if (payload_size != drrw->drr_logical_size) {
296  					drrw->drr_compressiontype = type;
297  					drrw->drr_compressed_size =
298  					    payload_size;
299  				} else {
300  					memcpy(buf, dbuf, payload_size);
301  					drrw->drr_compressiontype = 0;
302  					drrw->drr_compressed_size = 0;
303  				}
304  				free(dbuf);
305  			} else {
306  				drrw->drr_compressiontype = type;
307  				drrw->drr_compressed_size = 0;
308  			}
309  			break;
310  		}
311  
312  		case DRR_WRITE_EMBEDDED:
313  		{
314  			struct drr_write_embedded *drrwe =
315  			    &drr->drr_u.drr_write_embedded;
316  			VERIFY3S(begin, ==, 1);
317  			payload_size =
318  			    P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
319  			(void) sfread(buf, payload_size, stdin);
320  			break;
321  		}
322  
323  		case DRR_FREEOBJECTS:
324  		case DRR_FREE:
325  		case DRR_OBJECT_RANGE:
326  			VERIFY3S(begin, ==, 1);
327  			break;
328  
329  		default:
330  			(void) fprintf(stderr, "INVALID record type 0x%x\n",
331  			    drr->drr_type);
332  			/* should never happen, so assert */
333  			assert(B_FALSE);
334  		}
335  
336  		if (feof(stdout)) {
337  			fprintf(stderr, "Error: unexpected end-of-file\n");
338  			exit(1);
339  		}
340  		if (ferror(stdout)) {
341  			fprintf(stderr, "Error while reading file: %s\n",
342  			    strerror(errno));
343  			exit(1);
344  		}
345  
346  		/*
347  		 * We need to recalculate the checksum, and it needs to be
348  		 * initially zero to do that.  BEGIN records don't have
349  		 * a checksum.
350  		 */
351  		if (drr->drr_type != DRR_BEGIN) {
352  			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
353  			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
354  		}
355  		if (dump_record(drr, buf, payload_size,
356  		    &stream_cksum, STDOUT_FILENO) != 0)
357  			break;
358  		if (drr->drr_type == DRR_END) {
359  			/*
360  			 * Typically the END record is either the last
361  			 * thing in the stream, or it is followed
362  			 * by a BEGIN record (which also zeros the checksum).
363  			 * However, a stream package ends with two END
364  			 * records.  The last END record's checksum starts
365  			 * from zero.
366  			 */
367  			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
368  		}
369  	}
370  	free(buf);
371  	fletcher_4_fini();
372  	zio_fini();
373  	zstd_fini();
374  
375  	return (0);
376  }
377