xref: /freebsd/sys/contrib/openzfs/cmd/zstream/zstream_redup.c (revision 7fdf597e96a02165cfe22ff357b857d5fa15ed8a)
1 /*
2  * CDDL HEADER START
3  *
4  * This file and its contents are supplied under the terms of the
5  * Common Development and Distribution License ("CDDL"), version 1.0.
6  * You may only use this file in accordance with the terms of version
7  * 1.0 of the CDDL.
8  *
9  * A full copy of the text of the CDDL should have accompanied this
10  * source.  A copy of the CDDL is also available via the Internet at
11  * http://www.illumos.org/license/CDDL.
12  *
13  * CDDL HEADER END
14  */
15 
16 /*
17  * Copyright (c) 2020 by Delphix. All rights reserved.
18  */
19 
20 #include <assert.h>
21 #include <cityhash.h>
22 #include <ctype.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <libzfs.h>
26 #include <libzutil.h>
27 #include <stddef.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <umem.h>
32 #include <unistd.h>
33 #include <sys/debug.h>
34 #include <sys/stat.h>
35 #include <sys/zfs_ioctl.h>
36 #include <sys/zio_checksum.h>
37 #include "zfs_fletcher.h"
38 #include "zstream.h"
39 
40 
41 #define	MAX_RDT_PHYSMEM_PERCENT		20
42 #define	SMALLEST_POSSIBLE_MAX_RDT_MB		128
43 
44 typedef struct redup_entry {
45 	struct redup_entry	*rde_next;
46 	uint64_t rde_guid;
47 	uint64_t rde_object;
48 	uint64_t rde_offset;
49 	uint64_t rde_stream_offset;
50 } redup_entry_t;
51 
52 typedef struct redup_table {
53 	redup_entry_t	**redup_hash_array;
54 	umem_cache_t	*ddecache;
55 	uint64_t	ddt_count;
56 	int		numhashbits;
57 } redup_table_t;
58 
59 void *
60 safe_calloc(size_t n)
61 {
62 	void *rv = calloc(1, n);
63 	if (rv == NULL) {
64 		fprintf(stderr,
65 		    "Error: could not allocate %u bytes of memory\n",
66 		    (int)n);
67 		exit(1);
68 	}
69 	return (rv);
70 }
71 
72 /*
73  * Safe version of fread(), exits on error.
74  */
75 int
76 sfread(void *buf, size_t size, FILE *fp)
77 {
78 	int rv = fread(buf, size, 1, fp);
79 	if (rv == 0 && ferror(fp)) {
80 		(void) fprintf(stderr, "Error while reading file: %s\n",
81 		    strerror(errno));
82 		exit(1);
83 	}
84 	return (rv);
85 }
86 
87 /*
88  * Safe version of pread(), exits on error.
89  */
90 static void
91 spread(int fd, void *buf, size_t count, off_t offset)
92 {
93 	ssize_t err = pread(fd, buf, count, offset);
94 	if (err == -1) {
95 		(void) fprintf(stderr,
96 		    "Error while reading file: %s\n",
97 		    strerror(errno));
98 		exit(1);
99 	} else if (err != count) {
100 		(void) fprintf(stderr,
101 		    "Error while reading file: short read\n");
102 		exit(1);
103 	}
104 }
105 
106 static int
107 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,
108     zio_cksum_t *zc, int outfd)
109 {
110 	assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)
111 	    == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
112 	fletcher_4_incremental_native(drr,
113 	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);
114 	if (drr->drr_type != DRR_BEGIN) {
115 		assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.
116 		    drr_checksum.drr_checksum));
117 		drr->drr_u.drr_checksum.drr_checksum = *zc;
118 	}
119 	fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,
120 	    sizeof (zio_cksum_t), zc);
121 	if (write(outfd, drr, sizeof (*drr)) == -1)
122 		return (errno);
123 	if (payload_len != 0) {
124 		fletcher_4_incremental_native(payload, payload_len, zc);
125 		if (write(outfd, payload, payload_len) == -1)
126 			return (errno);
127 	}
128 	return (0);
129 }
130 
131 static void
132 rdt_insert(redup_table_t *rdt,
133     uint64_t guid, uint64_t object, uint64_t offset, uint64_t stream_offset)
134 {
135 	uint64_t ch = cityhash3(guid, object, offset);
136 	uint64_t hashcode = BF64_GET(ch, 0, rdt->numhashbits);
137 	redup_entry_t **rdepp;
138 
139 	rdepp = &(rdt->redup_hash_array[hashcode]);
140 	redup_entry_t *rde = umem_cache_alloc(rdt->ddecache, UMEM_NOFAIL);
141 	rde->rde_next = *rdepp;
142 	rde->rde_guid = guid;
143 	rde->rde_object = object;
144 	rde->rde_offset = offset;
145 	rde->rde_stream_offset = stream_offset;
146 	*rdepp = rde;
147 	rdt->ddt_count++;
148 }
149 
150 static void
151 rdt_lookup(redup_table_t *rdt,
152     uint64_t guid, uint64_t object, uint64_t offset,
153     uint64_t *stream_offsetp)
154 {
155 	uint64_t ch = cityhash3(guid, object, offset);
156 	uint64_t hashcode = BF64_GET(ch, 0, rdt->numhashbits);
157 
158 	for (redup_entry_t *rde = rdt->redup_hash_array[hashcode];
159 	    rde != NULL; rde = rde->rde_next) {
160 		if (rde->rde_guid == guid &&
161 		    rde->rde_object == object &&
162 		    rde->rde_offset == offset) {
163 			*stream_offsetp = rde->rde_stream_offset;
164 			return;
165 		}
166 	}
167 	assert(!"could not find expected redup table entry");
168 }
169 
170 /*
171  * Convert a dedup stream (generated by "zfs send -D") to a
172  * non-deduplicated stream.  The entire infd will be converted, including
173  * any substreams in a stream package (generated by "zfs send -RD"). The
174  * infd must be seekable.
175  */
176 static void
177 zfs_redup_stream(int infd, int outfd, boolean_t verbose)
178 {
179 	int bufsz = SPA_MAXBLOCKSIZE;
180 	dmu_replay_record_t thedrr;
181 	dmu_replay_record_t *drr = &thedrr;
182 	redup_table_t rdt;
183 	zio_cksum_t stream_cksum;
184 	uint64_t numbuckets;
185 	uint64_t num_records = 0;
186 	uint64_t num_write_byref_records = 0;
187 
188 	memset(&thedrr, 0, sizeof (dmu_replay_record_t));
189 
190 #ifdef _ILP32
191 	uint64_t max_rde_size = SMALLEST_POSSIBLE_MAX_RDT_MB << 20;
192 #else
193 	uint64_t physmem = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE);
194 	uint64_t max_rde_size =
195 	    MAX((physmem * MAX_RDT_PHYSMEM_PERCENT) / 100,
196 	    SMALLEST_POSSIBLE_MAX_RDT_MB << 20);
197 #endif
198 
199 	numbuckets = max_rde_size / (sizeof (redup_entry_t));
200 
201 	/*
202 	 * numbuckets must be a power of 2.  Increase number to
203 	 * a power of 2 if necessary.
204 	 */
205 	if (!ISP2(numbuckets))
206 		numbuckets = 1ULL << highbit64(numbuckets);
207 
208 	rdt.redup_hash_array =
209 	    safe_calloc(numbuckets * sizeof (redup_entry_t *));
210 	rdt.ddecache = umem_cache_create("rde", sizeof (redup_entry_t), 0,
211 	    NULL, NULL, NULL, NULL, NULL, 0);
212 	rdt.numhashbits = highbit64(numbuckets) - 1;
213 	rdt.ddt_count = 0;
214 
215 	char *buf = safe_calloc(bufsz);
216 	FILE *ofp = fdopen(infd, "r");
217 	long offset = ftell(ofp);
218 	int begin = 0;
219 	boolean_t seen = B_FALSE;
220 	while (sfread(drr, sizeof (*drr), ofp) != 0) {
221 		num_records++;
222 
223 		/*
224 		 * We need to regenerate the checksum.
225 		 */
226 		if (drr->drr_type != DRR_BEGIN) {
227 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
228 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
229 		}
230 
231 		uint64_t payload_size = 0;
232 		switch (drr->drr_type) {
233 		case DRR_BEGIN:
234 		{
235 			struct drr_begin *drrb = &drr->drr_u.drr_begin;
236 			int fflags;
237 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
238 			VERIFY0(begin++);
239 			seen = B_TRUE;
240 
241 			assert(drrb->drr_magic == DMU_BACKUP_MAGIC);
242 
243 			/* clear the DEDUP feature flag for this stream */
244 			fflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
245 			fflags &= ~(DMU_BACKUP_FEATURE_DEDUP |
246 			    DMU_BACKUP_FEATURE_DEDUPPROPS);
247 			/* cppcheck-suppress syntaxError */
248 			DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, fflags);
249 
250 			uint32_t sz = drr->drr_payloadlen;
251 
252 			VERIFY3U(sz, <=, 1U << 28);
253 
254 			if (sz != 0) {
255 				if (sz > bufsz) {
256 					free(buf);
257 					buf = safe_calloc(sz);
258 					bufsz = sz;
259 				}
260 				(void) sfread(buf, sz, ofp);
261 			}
262 			payload_size = sz;
263 			break;
264 		}
265 
266 		case DRR_END:
267 		{
268 			struct drr_end *drre = &drr->drr_u.drr_end;
269 			/*
270 			 * We would prefer to just check --begin == 0, but
271 			 * replication streams have an end of stream END
272 			 * record, so we must avoid tripping it.
273 			 */
274 			VERIFY3B(seen, ==, B_TRUE);
275 			begin--;
276 			/*
277 			 * Use the recalculated checksum, unless this is
278 			 * the END record of a stream package, which has
279 			 * no checksum.
280 			 */
281 			if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))
282 				drre->drr_checksum = stream_cksum;
283 			break;
284 		}
285 
286 		case DRR_OBJECT:
287 		{
288 			struct drr_object *drro = &drr->drr_u.drr_object;
289 			VERIFY3S(begin, ==, 1);
290 
291 			if (drro->drr_bonuslen > 0) {
292 				payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);
293 				(void) sfread(buf, payload_size, ofp);
294 			}
295 			break;
296 		}
297 
298 		case DRR_SPILL:
299 		{
300 			struct drr_spill *drrs = &drr->drr_u.drr_spill;
301 			VERIFY3S(begin, ==, 1);
302 			payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);
303 			(void) sfread(buf, payload_size, ofp);
304 			break;
305 		}
306 
307 		case DRR_WRITE_BYREF:
308 		{
309 			struct drr_write_byref drrwb =
310 			    drr->drr_u.drr_write_byref;
311 			VERIFY3S(begin, ==, 1);
312 
313 			num_write_byref_records++;
314 
315 			/*
316 			 * Look up in hash table by drrwb->drr_refguid,
317 			 * drr_refobject, drr_refoffset.  Replace this
318 			 * record with the found WRITE record, but with
319 			 * drr_object,drr_offset,drr_toguid replaced with ours.
320 			 */
321 			uint64_t stream_offset = 0;
322 			rdt_lookup(&rdt, drrwb.drr_refguid,
323 			    drrwb.drr_refobject, drrwb.drr_refoffset,
324 			    &stream_offset);
325 
326 			spread(infd, drr, sizeof (*drr), stream_offset);
327 
328 			assert(drr->drr_type == DRR_WRITE);
329 			struct drr_write *drrw = &drr->drr_u.drr_write;
330 			assert(drrw->drr_toguid == drrwb.drr_refguid);
331 			assert(drrw->drr_object == drrwb.drr_refobject);
332 			assert(drrw->drr_offset == drrwb.drr_refoffset);
333 
334 			payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
335 			spread(infd, buf, payload_size,
336 			    stream_offset + sizeof (*drr));
337 
338 			drrw->drr_toguid = drrwb.drr_toguid;
339 			drrw->drr_object = drrwb.drr_object;
340 			drrw->drr_offset = drrwb.drr_offset;
341 			break;
342 		}
343 
344 		case DRR_WRITE:
345 		{
346 			struct drr_write *drrw = &drr->drr_u.drr_write;
347 			VERIFY3S(begin, ==, 1);
348 			payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);
349 			(void) sfread(buf, payload_size, ofp);
350 
351 			rdt_insert(&rdt, drrw->drr_toguid,
352 			    drrw->drr_object, drrw->drr_offset, offset);
353 			break;
354 		}
355 
356 		case DRR_WRITE_EMBEDDED:
357 		{
358 			struct drr_write_embedded *drrwe =
359 			    &drr->drr_u.drr_write_embedded;
360 			VERIFY3S(begin, ==, 1);
361 			payload_size =
362 			    P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);
363 			(void) sfread(buf, payload_size, ofp);
364 			break;
365 		}
366 
367 		case DRR_FREEOBJECTS:
368 		case DRR_FREE:
369 		case DRR_OBJECT_RANGE:
370 			VERIFY3S(begin, ==, 1);
371 			break;
372 
373 		default:
374 			(void) fprintf(stderr, "INVALID record type 0x%x\n",
375 			    drr->drr_type);
376 			/* should never happen, so assert */
377 			assert(B_FALSE);
378 		}
379 
380 		if (feof(ofp)) {
381 			fprintf(stderr, "Error: unexpected end-of-file\n");
382 			exit(1);
383 		}
384 		if (ferror(ofp)) {
385 			fprintf(stderr, "Error while reading file: %s\n",
386 			    strerror(errno));
387 			exit(1);
388 		}
389 
390 		/*
391 		 * We need to recalculate the checksum, and it needs to be
392 		 * initially zero to do that.  BEGIN records don't have
393 		 * a checksum.
394 		 */
395 		if (drr->drr_type != DRR_BEGIN) {
396 			memset(&drr->drr_u.drr_checksum.drr_checksum, 0,
397 			    sizeof (drr->drr_u.drr_checksum.drr_checksum));
398 		}
399 		if (dump_record(drr, buf, payload_size,
400 		    &stream_cksum, outfd) != 0)
401 			break;
402 		if (drr->drr_type == DRR_END) {
403 			/*
404 			 * Typically the END record is either the last
405 			 * thing in the stream, or it is followed
406 			 * by a BEGIN record (which also zeros the checksum).
407 			 * However, a stream package ends with two END
408 			 * records.  The last END record's checksum starts
409 			 * from zero.
410 			 */
411 			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
412 		}
413 		offset = ftell(ofp);
414 	}
415 
416 	if (verbose) {
417 		char mem_str[16];
418 		zfs_nicenum(rdt.ddt_count * sizeof (redup_entry_t),
419 		    mem_str, sizeof (mem_str));
420 		fprintf(stderr, "converted stream with %llu total records, "
421 		    "including %llu dedup records, using %sB memory.\n",
422 		    (long long)num_records,
423 		    (long long)num_write_byref_records,
424 		    mem_str);
425 	}
426 
427 	umem_cache_destroy(rdt.ddecache);
428 	free(rdt.redup_hash_array);
429 	free(buf);
430 	(void) fclose(ofp);
431 }
432 
433 int
434 zstream_do_redup(int argc, char *argv[])
435 {
436 	boolean_t verbose = B_FALSE;
437 	int c;
438 
439 	while ((c = getopt(argc, argv, "v")) != -1) {
440 		switch (c) {
441 		case 'v':
442 			verbose = B_TRUE;
443 			break;
444 		case '?':
445 			(void) fprintf(stderr, "invalid option '%c'\n",
446 			    optopt);
447 			zstream_usage();
448 			break;
449 		}
450 	}
451 
452 	argc -= optind;
453 	argv += optind;
454 
455 	if (argc != 1)
456 		zstream_usage();
457 
458 	const char *filename = argv[0];
459 
460 	if (isatty(STDOUT_FILENO)) {
461 		(void) fprintf(stderr,
462 		    "Error: Stream can not be written to a terminal.\n"
463 		    "You must redirect standard output.\n");
464 		return (1);
465 	}
466 
467 	int fd = open(filename, O_RDONLY);
468 	if (fd == -1) {
469 		(void) fprintf(stderr,
470 		    "Error while opening file '%s': %s\n",
471 		    filename, strerror(errno));
472 		exit(1);
473 	}
474 
475 	fletcher_4_init();
476 	zfs_redup_stream(fd, STDOUT_FILENO, verbose);
477 	fletcher_4_fini();
478 
479 	close(fd);
480 
481 	return (0);
482 }
483