xref: /freebsd/sys/contrib/openzfs/cmd/zstream/zstream_io.c (revision d0b3ecdc274930e190ea233b6b69ff03782eaf8d)
1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3  * CDDL HEADER START
4  *
5  * This file and its contents are supplied under the terms of the Common
6  * Development and Distribution License ("CDDL"), version 1.0. You may only use
7  * this file in accordance with the terms of version 1.0 of the CDDL.
8  *
9  * A full copy of the text of the CDDL should have accompanied this source. A
10  * copy of the CDDL is also available via the Internet at
11  * http://www.illumos.org/license/CDDL.
12  *
13  * CDDL HEADER END
14  */
15 
16 /*
17  * Copyright (c) 2026 by Garth Snyder. All rights reserved.
18  */
19 
20 #include <arpa/inet.h>
21 #include <err.h>
22 #include <errno.h>
23 #include <libzutil.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/byteorder.h>
28 #include <sys/stdtypes.h>
29 #include <sys/sysmacros.h>
30 #include <sys/types.h>
31 #include <sys/zfs_ioctl.h>
32 #include <time.h>
33 #include <unistd.h>
34 
35 #include "zstream_chain.h"
36 #include "zstream_modules.h"
37 #include "zstream_util.h"
38 
39 /*
40  * Init only the filename; chain functions will prepare the FILE *
41  */
42 typedef struct {
43 	const char	*ic_filename;
44 	FILE		*ic_fp;
45 	boolean_t	ic_for_reading;
46 	off_t		ic_offset;
47 } io_context_t;
48 
49 typedef struct {
50 	const char	*cc_name;
51 	double		cc_last_sec;
52 	double		cc_period_sec;
53 	uint64_t	cc_last_bytes;
54 } checkpoint_context_t;
55 
56 static io_context_t io_contexts[MAX_IO_STREAMS];
57 static int next_io_context = 0;
58 
59 static checkpoint_context_t checkpoint_contexts[MAX_IO_STREAMS];
60 static int next_checkpoint_context = 0;
61 
62 /*
63  * Run from within chain execution to initialize I/O. A NULL filename
64  * indicates stdin or stdout.
65  */
66 static void
open_file(io_context_t * context)67 open_file(io_context_t *context)
68 {
69 	if (context->ic_filename) {
70 		context->ic_fp = fopen(context->ic_filename,
71 		    context->ic_for_reading ? "rb" : "wb+");
72 		if (!context->ic_fp) {
73 			perror(context->ic_filename);
74 			exit(1);
75 		}
76 	} else if (context->ic_for_reading && isatty(STDIN_FILENO)) {
77 		errx(1, "stream cannot be read from a terminal. "
78 		    "Name a file or take input from a pipe.");
79 	} else if (context->ic_for_reading) {
80 		context->ic_fp = stdin;
81 	} else if (isatty(STDOUT_FILENO)) {
82 		errx(1, "stream cannot be written to a terminal. "
83 		    "Capture output to a file or pipe to another command.");
84 	} else {
85 		context->ic_fp = stdout;
86 	}
87 }
88 
89 /*
90  * Extract the payload size from a replay record that is potentially
91  * byteswapped. We want to leave the bulk of byteswapping to another module,
92  * so just take a quick, nondestructive peek.
93  *
94  * Record-specific macros such as DRR_WRITE_PAYLOAD_SIZE do not seem to be
95  * byteswap-aware. However, with the exception of DRR_OBJECT_PAYLOAD_SIZE,
96  * they happen to work with post-swapping since they are switching on either
97  * a uint8_t value or 0.
98  *
99  * DRR_WRITE and DRR_SPILL use 64-bit sizes. The other two record types have
100  * 32-bit sizes. The drr_payloadlen field shared by all record types (but
101  * used only by BEGIN records is also 32 bits.
102  */
103 static size_t
calc_payload_size(dmu_replay_record_t * drr)104 calc_payload_size(dmu_replay_record_t *drr)
105 {
106 	struct drr_object *drro		 = &drr->drr_u.drr_object;
107 	struct drr_write *drrw		 = &drr->drr_u.drr_write;
108 	struct drr_spill *drrs		 = &drr->drr_u.drr_spill;
109 	struct drr_write_embedded *drrwe = &drr->drr_u.drr_write_embedded;
110 
111 	boolean_t swap = ATTR_IS_SET(CA_BYTESWAPPED);
112 	uint32_t drr_type = swap ? BSWAP_32(drr->drr_type) : drr->drr_type;
113 	uint64_t size, size64 = 0;
114 	uint32_t size32 = 0;
115 	boolean_t round = B_FALSE;
116 
117 	if (drr_type == DRR_OBJECT) {
118 		round = drro->drr_raw_bonuslen == 0;
119 		size32 = round ? drro->drr_bonuslen : drro->drr_raw_bonuslen;
120 	} else if (drr_type == DRR_WRITE) {
121 		size64 = DRR_WRITE_PAYLOAD_SIZE(drrw);
122 	} else if (drr_type == DRR_SPILL) {
123 		size64 = DRR_SPILL_PAYLOAD_SIZE(drrs);
124 	} else if (drr_type == DRR_WRITE_EMBEDDED) {
125 		size32 = drrwe->drr_psize;
126 		round = B_TRUE;
127 	} else if (drr_type == DRR_BEGIN) {
128 		size32 = drr->drr_payloadlen;
129 	} else {
130 		return (0);
131 	}
132 	if (size32 != 0) {
133 		size = swap ? BSWAP_32(size32) : size32;
134 	} else {
135 		size = swap ? BSWAP_64(size64) : size64;
136 	}
137 	return (round ? P2ROUNDUP(size, 8) : size);
138 }
139 
140 /*
141  * Must be called only with the first record in a stream. Must be a
142  * DRR_BEGIN record or we'll terminate with "invalid stream".
143  */
144 static void
set_stream_attributes(drr_packet_t * item)145 set_stream_attributes(drr_packet_t *item)
146 {
147 	dmu_replay_record_t *drr  = &item->dp_drr;
148 	struct drr_begin *drrb    = &drr->drr_u.drr_begin;
149 	uint64_t magic		  = drrb->drr_magic;
150 	uint64_t versioninfo	  = drrb->drr_versioninfo;
151 	boolean_t i_am_big_endian = htonl(0xFF00) == 0xFF00;
152 
153 	boolean_t swap_on_output, is_deduped;
154 
155 	if (magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
156 		SET_ATTR(CA_BYTESWAPPED);
157 		versioninfo = BSWAP_64(versioninfo);
158 	} else if (magic != DMU_BACKUP_MAGIC) {
159 		errx(1, "invalid ZFS stream, bad magic number %llx",
160 		    (u_longlong_t)magic);
161 	}
162 	if (i_am_big_endian == ATTR_IS_SET(CA_BYTESWAPPED)) {
163 		SET_ATTR(CA_LITTLE_ENDIAN_INPUT);
164 	} else {
165 		SET_ATTR(CA_BIG_ENDIAN_INPUT);
166 	}
167 	chain_attrs->ca_feature_flags = DMU_GET_FEATUREFLAGS(versioninfo);
168 
169 	is_deduped =
170 	    STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUP) ||
171 	    STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUPPROPS);
172 
173 	if (OPTION_ENABLED(CA_FORBID_DEDUP) && is_deduped) {
174 		errx(1, "input stream is deduplicated, but this subcommand "
175 		    "does not support deduplicated streams. Use 'zstream "
176 		    "redup' to reduplicate.");
177 	}
178 	boolean_t req_dedup = OPTION_ENABLED(CA_REQUIRE_DEDUP);
179 	boolean_t is_dedup = STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUP);
180 	if (req_dedup && !is_dedup) {
181 		errx(1, "this subcommand requires a deduplicated input "
182 		    "stream, but the stream is not deduplicated");
183 	}
184 	boolean_t req_native = OPTION_ENABLED(CA_REQUIRE_NATIVE_ENDIAN);
185 	boolean_t is_byteswapped = ATTR_IS_SET(CA_BYTESWAPPED);
186 	if (req_native && is_byteswapped) {
187 		errx(1, "this subcommand requires a native-endian "
188 		    "input stream");
189 	}
190 
191 	/*
192 	 * Figure out output endianness. In the absence of explicit byte
193 	 * order instructions, we default to preserving the input byte
194 	 * order. Record headers are always converted to native byte order
195 	 * for processing, but they can be swapped back on output.
196 	 *
197 	 * zfs receive inspects the endianness of each DRR record
198 	 * and assumes, at least in some cases, that payload data has the
199 	 * same order as the DMU wrappers.
200 	 */
201 	if (OPTION_ENABLED(CA_BIG_ENDIAN_OUT))
202 		swap_on_output = !i_am_big_endian;
203 	else if (OPTION_ENABLED(CA_LITTLE_ENDIAN_OUT))
204 		swap_on_output = i_am_big_endian;
205 	else if (OPTION_ENABLED(CA_OPPOSITE_ENDIAN_OUT))
206 		swap_on_output = !ATTR_IS_SET(CA_BYTESWAPPED);
207 	else
208 		swap_on_output = ATTR_IS_SET(CA_BYTESWAPPED);
209 
210 	if (swap_on_output) {
211 		ENABLE_OPTION(chain_attrs, CA_BYTESWAP_ON_OUTPUT);
212 	}
213 }
214 
215 static disposition_t
chain_read(drr_packet_t * item,io_context_t * context)216 chain_read(drr_packet_t *item, io_context_t *context)
217 {
218 	if (item == NULL)
219 		return (D_OK);
220 
221 	dmu_replay_record_t *drr = &item->dp_drr;
222 
223 	if (!context->ic_fp)
224 		open_file(context);
225 
226 	if (fread(drr, sizeof (dmu_replay_record_t), 1, context->ic_fp) != 1) {
227 		if (ferror(context->ic_fp)) {
228 			err(1, "error reading record header at offset %llu",
229 			    (u_longlong_t)context->ic_offset);
230 		}
231 		fclose(context->ic_fp);
232 		return (D_EOF);
233 	}
234 
235 	if (context->ic_offset == 0)
236 		set_stream_attributes(item);
237 
238 	size_t payload_size = calc_payload_size(&item->dp_drr);
239 	if (payload_size > UINT32_MAX) {
240 		errx(1, "stated packet size is greater than uint32_t"
241 		    "at offset %llu", (u_longlong_t)context->ic_offset);
242 	}
243 	item->dp_payload_size = payload_size;
244 	if (item->dp_payload_size > 0) {
245 		item->dp_payload = safe_malloc(item->dp_payload_size);
246 		size_t n_read = fread(item->dp_payload, item->dp_payload_size,
247 		    1, context->ic_fp);
248 		if (n_read != 1) {
249 			if (ferror(context->ic_fp)) {
250 				err(1, "error reading record payload at "
251 				    " offset %llu",
252 				    (u_longlong_t)context->ic_offset);
253 			} else {
254 				/*
255 				 * We can't exit here because the ZFS test
256 				 * suite depends on being able to process
257 				 * streams truncated at random places.
258 				 */
259 				warnx("input ends mid-record at offset %llu "
260 				    "- stream is likely corrupt",
261 				    (u_longlong_t)context->ic_offset);
262 				fclose(context->ic_fp);
263 				free(item->dp_payload);
264 				return (D_EOF);
265 			}
266 		}
267 	} else {
268 		item->dp_payload = NULL;
269 	}
270 	item->dp_stream_offset = context->ic_offset;
271 
272 	uint32_t drr_type = ATTR_IS_SET(CA_BYTESWAPPED) ?
273 	    BSWAP_32(drr->drr_type) : drr->drr_type;
274 
275 	if (drr_type >= DRR_NUMTYPES) {
276 		err(1, "invalid record type %llu found at offset %llu",
277 		    (u_longlong_t)drr_type, (u_longlong_t)context->ic_offset);
278 	}
279 
280 	context->ic_offset += sizeof (*drr) + item->dp_payload_size;
281 
282 	record_stats_t *stats = &chain_attrs->ca_stats_in[drr_type];
283 	stats->rs_num_records++;
284 	stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
285 	stats->rs_total_payload_bytes += item->dp_payload_size;
286 
287 	stats = &chain_attrs->ca_totals_in;
288 	stats->rs_num_records++;
289 	stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
290 	stats->rs_total_payload_bytes += item->dp_payload_size;
291 
292 	return (D_OK);
293 }
294 
295 static disposition_t
chain_write(drr_packet_t * item,io_context_t * context)296 chain_write(drr_packet_t *item, io_context_t *context)
297 {
298 	if (item == NULL) {
299 		if (context->ic_fp) {
300 			if (fclose(context->ic_fp) != 0)
301 				err(1, "error closing output stream");
302 			context->ic_fp = NULL;
303 		}
304 		return (D_OK);
305 	}
306 
307 	if (!context->ic_fp) {
308 		open_file(context);
309 	}
310 
311 	dmu_replay_record_t *drr = &item->dp_drr;
312 
313 	if (fwrite(drr, sizeof (dmu_replay_record_t), 1, context->ic_fp) != 1) {
314 		err(1, "error writing record header");
315 	} else if (item->dp_payload_size > 0) {
316 		size_t n_written = fwrite(item->dp_payload,
317 		    item->dp_payload_size, 1, context->ic_fp);
318 		if (n_written != 1) {
319 			err(1, "error writing payload");
320 		} else {
321 			free(item->dp_payload);
322 			item->dp_payload = NULL;
323 		}
324 	}
325 
326 	uint32_t drr_type = OPTION_ENABLED(CA_BYTESWAP_ON_OUTPUT) ?
327 	    BSWAP_32(drr->drr_type) : drr->drr_type;
328 
329 	record_stats_t *stats = &chain_attrs->ca_stats_out[drr_type];
330 	stats->rs_num_records++;
331 	stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
332 	stats->rs_total_payload_bytes += item->dp_payload_size;
333 
334 	stats = &chain_attrs->ca_totals_out;
335 	stats->rs_num_records++;
336 	stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
337 	stats->rs_total_payload_bytes += item->dp_payload_size;
338 
339 	return (D_OK);
340 }
341 
342 /*
343  * Even if the chain doesn't write out a stream, payloads still need freed.
344  */
345 static disposition_t
chain_null_output(drr_packet_t * item,void * context)346 chain_null_output(drr_packet_t *item, void *context)
347 {
348 	(void) context;
349 	if (item && item->dp_payload != NULL && item->dp_payload_size > 0) {
350 		free(item->dp_payload);
351 		item->dp_payload = NULL;
352 		item->dp_payload_size = 0;
353 	}
354 	return (D_OK);
355 }
356 
357 /*
358  * Storage for the filename must remain valid during chain execution
359  */
360 static chain_step_t
setup_io(const char * filename,boolean_t for_reading)361 setup_io(const char *filename, boolean_t for_reading)
362 {
363 	int context_num = next_io_context++ % MAX_IO_STREAMS;
364 
365 	io_context_t context = {
366 		.ic_filename = filename,
367 		.ic_for_reading = for_reading
368 	};
369 	io_contexts[context_num] = context;
370 
371 	chain_step_t step = {
372 		.cs_type = CS_SERIAL,
373 		.cs_in_size = 0,
374 		.cs_out_size = sizeof (drr_packet_t),
375 		.cs_context = &io_contexts[context_num],
376 		.cs_serial = {
377 			.process = (zc_serial_process_f *)
378 			    (for_reading ? chain_read : chain_write),
379 		}
380 	};
381 	return (step);
382 }
383 
384 chain_step_t
serial_read_stream(const char * filename)385 serial_read_stream(const char *filename)
386 {
387 	return (setup_io(filename, B_TRUE));
388 }
389 
390 chain_step_t
serial_write_stream(const char * filename)391 serial_write_stream(const char *filename)
392 {
393 	return (setup_io(filename, B_FALSE));
394 }
395 
396 chain_step_t
serial_null_output(void)397 serial_null_output(void)
398 {
399 	chain_step_t step = {
400 		.cs_type = CS_SERIAL,
401 		.cs_in_size = sizeof (drr_packet_t),
402 		.cs_out_size = 0,
403 		.cs_context = NULL,
404 		.cs_serial = {
405 			.process = (zc_serial_process_f *)chain_null_output
406 		}
407 	};
408 	return (step);
409 }
410 
411 static disposition_t
chain_checkpoint(drr_packet_t * item,checkpoint_context_t * ctxt)412 chain_checkpoint(drr_packet_t *item, checkpoint_context_t *ctxt)
413 {
414 	struct timespec now;
415 	char buff[32];
416 	uint64_t delta_b, dbdt;
417 	double now_sec, delta_t;
418 
419 	if (item == NULL)
420 		return (D_OK);
421 
422 	clock_gettime(CLOCK_MONOTONIC, &now);
423 	now_sec = now.tv_sec + (double)now.tv_nsec / 1E9;
424 	if (ctxt->cc_last_sec > 1E-9) {
425 		delta_t = now_sec - ctxt->cc_last_sec;
426 		if (delta_t < ctxt->cc_period_sec)
427 			return (D_OK);
428 		delta_b = item->dp_stream_offset - ctxt->cc_last_bytes;
429 		dbdt = delta_b / delta_t;
430 		zfs_nicenum(dbdt, buff, sizeof (buff));
431 		fprintf(stderr, "Checkpoint %s: %s/s\n", ctxt->cc_name, buff);
432 	}
433 	ctxt->cc_last_sec = now_sec;
434 	ctxt->cc_last_bytes = item->dp_stream_offset;
435 	return (D_OK);
436 }
437 
438 /*
439  * Storage for name must remain valid throughout chain execution
440  */
441 chain_step_t
serial_checkpoint(const char * name)442 serial_checkpoint(const char *name)
443 {
444 	int context_no = next_checkpoint_context++ % MAX_IO_STREAMS;
445 
446 	checkpoint_context_t context = {
447 		.cc_name = name,
448 		.cc_period_sec = 1.0
449 	};
450 	checkpoint_contexts[context_no] = context;
451 
452 	chain_step_t step = {
453 		.cs_type = CS_SERIAL,
454 		.cs_in_size = sizeof (drr_packet_t),
455 		.cs_out_size = sizeof (drr_packet_t),
456 		.cs_context = &checkpoint_contexts[context_no],
457 		.cs_serial = {
458 			.process = (zc_serial_process_f *)chain_checkpoint
459 		},
460 	};
461 	return (step);
462 }
463