1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3 * CDDL HEADER START
4 *
5 * This file and its contents are supplied under the terms of the Common
6 * Development and Distribution License ("CDDL"), version 1.0. You may only use
7 * this file in accordance with the terms of version 1.0 of the CDDL.
8 *
9 * A full copy of the text of the CDDL should have accompanied this source. A
10 * copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
12 *
13 * CDDL HEADER END
14 */
15
16 /*
17 * Copyright (c) 2026 by Garth Snyder. All rights reserved.
18 */
19
20 #include <arpa/inet.h>
21 #include <err.h>
22 #include <errno.h>
23 #include <libzutil.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/byteorder.h>
28 #include <sys/stdtypes.h>
29 #include <sys/sysmacros.h>
30 #include <sys/types.h>
31 #include <sys/zfs_ioctl.h>
32 #include <time.h>
33 #include <unistd.h>
34
35 #include "zstream_chain.h"
36 #include "zstream_modules.h"
37 #include "zstream_util.h"
38
39 /*
40 * Init only the filename; chain functions will prepare the FILE *
41 */
42 typedef struct {
43 const char *ic_filename;
44 FILE *ic_fp;
45 boolean_t ic_for_reading;
46 off_t ic_offset;
47 } io_context_t;
48
49 typedef struct {
50 const char *cc_name;
51 double cc_last_sec;
52 double cc_period_sec;
53 uint64_t cc_last_bytes;
54 } checkpoint_context_t;
55
56 static io_context_t io_contexts[MAX_IO_STREAMS];
57 static int next_io_context = 0;
58
59 static checkpoint_context_t checkpoint_contexts[MAX_IO_STREAMS];
60 static int next_checkpoint_context = 0;
61
62 /*
63 * Run from within chain execution to initialize I/O. A NULL filename
64 * indicates stdin or stdout.
65 */
66 static void
open_file(io_context_t * context)67 open_file(io_context_t *context)
68 {
69 if (context->ic_filename) {
70 context->ic_fp = fopen(context->ic_filename,
71 context->ic_for_reading ? "rb" : "wb+");
72 if (!context->ic_fp) {
73 perror(context->ic_filename);
74 exit(1);
75 }
76 } else if (context->ic_for_reading && isatty(STDIN_FILENO)) {
77 errx(1, "stream cannot be read from a terminal. "
78 "Name a file or take input from a pipe.");
79 } else if (context->ic_for_reading) {
80 context->ic_fp = stdin;
81 } else if (isatty(STDOUT_FILENO)) {
82 errx(1, "stream cannot be written to a terminal. "
83 "Capture output to a file or pipe to another command.");
84 } else {
85 context->ic_fp = stdout;
86 }
87 }
88
89 /*
90 * Extract the payload size from a replay record that is potentially
91 * byteswapped. We want to leave the bulk of byteswapping to another module,
92 * so just take a quick, nondestructive peek.
93 *
94 * Record-specific macros such as DRR_WRITE_PAYLOAD_SIZE do not seem to be
95 * byteswap-aware. However, with the exception of DRR_OBJECT_PAYLOAD_SIZE,
96 * they happen to work with post-swapping since they are switching on either
97 * a uint8_t value or 0.
98 *
99 * DRR_WRITE and DRR_SPILL use 64-bit sizes. The other two record types have
100 * 32-bit sizes. The drr_payloadlen field shared by all record types (but
101 * used only by BEGIN records is also 32 bits.
102 */
103 static size_t
calc_payload_size(dmu_replay_record_t * drr)104 calc_payload_size(dmu_replay_record_t *drr)
105 {
106 struct drr_object *drro = &drr->drr_u.drr_object;
107 struct drr_write *drrw = &drr->drr_u.drr_write;
108 struct drr_spill *drrs = &drr->drr_u.drr_spill;
109 struct drr_write_embedded *drrwe = &drr->drr_u.drr_write_embedded;
110
111 boolean_t swap = ATTR_IS_SET(CA_BYTESWAPPED);
112 uint32_t drr_type = swap ? BSWAP_32(drr->drr_type) : drr->drr_type;
113 uint64_t size, size64 = 0;
114 uint32_t size32 = 0;
115 boolean_t round = B_FALSE;
116
117 if (drr_type == DRR_OBJECT) {
118 round = drro->drr_raw_bonuslen == 0;
119 size32 = round ? drro->drr_bonuslen : drro->drr_raw_bonuslen;
120 } else if (drr_type == DRR_WRITE) {
121 size64 = DRR_WRITE_PAYLOAD_SIZE(drrw);
122 } else if (drr_type == DRR_SPILL) {
123 size64 = DRR_SPILL_PAYLOAD_SIZE(drrs);
124 } else if (drr_type == DRR_WRITE_EMBEDDED) {
125 size32 = drrwe->drr_psize;
126 round = B_TRUE;
127 } else if (drr_type == DRR_BEGIN) {
128 size32 = drr->drr_payloadlen;
129 } else {
130 return (0);
131 }
132 if (size32 != 0) {
133 size = swap ? BSWAP_32(size32) : size32;
134 } else {
135 size = swap ? BSWAP_64(size64) : size64;
136 }
137 return (round ? P2ROUNDUP(size, 8) : size);
138 }
139
140 /*
141 * Must be called only with the first record in a stream. Must be a
142 * DRR_BEGIN record or we'll terminate with "invalid stream".
143 */
144 static void
set_stream_attributes(drr_packet_t * item)145 set_stream_attributes(drr_packet_t *item)
146 {
147 dmu_replay_record_t *drr = &item->dp_drr;
148 struct drr_begin *drrb = &drr->drr_u.drr_begin;
149 uint64_t magic = drrb->drr_magic;
150 uint64_t versioninfo = drrb->drr_versioninfo;
151 boolean_t i_am_big_endian = htonl(0xFF00) == 0xFF00;
152
153 boolean_t swap_on_output, is_deduped;
154
155 if (magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
156 SET_ATTR(CA_BYTESWAPPED);
157 versioninfo = BSWAP_64(versioninfo);
158 } else if (magic != DMU_BACKUP_MAGIC) {
159 errx(1, "invalid ZFS stream, bad magic number %llx",
160 (u_longlong_t)magic);
161 }
162 if (i_am_big_endian == ATTR_IS_SET(CA_BYTESWAPPED)) {
163 SET_ATTR(CA_LITTLE_ENDIAN_INPUT);
164 } else {
165 SET_ATTR(CA_BIG_ENDIAN_INPUT);
166 }
167 chain_attrs->ca_feature_flags = DMU_GET_FEATUREFLAGS(versioninfo);
168
169 is_deduped =
170 STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUP) ||
171 STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUPPROPS);
172
173 if (OPTION_ENABLED(CA_FORBID_DEDUP) && is_deduped) {
174 errx(1, "input stream is deduplicated, but this subcommand "
175 "does not support deduplicated streams. Use 'zstream "
176 "redup' to reduplicate.");
177 }
178 boolean_t req_dedup = OPTION_ENABLED(CA_REQUIRE_DEDUP);
179 boolean_t is_dedup = STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUP);
180 if (req_dedup && !is_dedup) {
181 errx(1, "this subcommand requires a deduplicated input "
182 "stream, but the stream is not deduplicated");
183 }
184 boolean_t req_native = OPTION_ENABLED(CA_REQUIRE_NATIVE_ENDIAN);
185 boolean_t is_byteswapped = ATTR_IS_SET(CA_BYTESWAPPED);
186 if (req_native && is_byteswapped) {
187 errx(1, "this subcommand requires a native-endian "
188 "input stream");
189 }
190
191 /*
192 * Figure out output endianness. In the absence of explicit byte
193 * order instructions, we default to preserving the input byte
194 * order. Record headers are always converted to native byte order
195 * for processing, but they can be swapped back on output.
196 *
197 * zfs receive inspects the endianness of each DRR record
198 * and assumes, at least in some cases, that payload data has the
199 * same order as the DMU wrappers.
200 */
201 if (OPTION_ENABLED(CA_BIG_ENDIAN_OUT))
202 swap_on_output = !i_am_big_endian;
203 else if (OPTION_ENABLED(CA_LITTLE_ENDIAN_OUT))
204 swap_on_output = i_am_big_endian;
205 else if (OPTION_ENABLED(CA_OPPOSITE_ENDIAN_OUT))
206 swap_on_output = !ATTR_IS_SET(CA_BYTESWAPPED);
207 else
208 swap_on_output = ATTR_IS_SET(CA_BYTESWAPPED);
209
210 if (swap_on_output) {
211 ENABLE_OPTION(chain_attrs, CA_BYTESWAP_ON_OUTPUT);
212 }
213 }
214
215 static disposition_t
chain_read(drr_packet_t * item,io_context_t * context)216 chain_read(drr_packet_t *item, io_context_t *context)
217 {
218 if (item == NULL)
219 return (D_OK);
220
221 dmu_replay_record_t *drr = &item->dp_drr;
222
223 if (!context->ic_fp)
224 open_file(context);
225
226 if (fread(drr, sizeof (dmu_replay_record_t), 1, context->ic_fp) != 1) {
227 if (ferror(context->ic_fp)) {
228 err(1, "error reading record header at offset %llu",
229 (u_longlong_t)context->ic_offset);
230 }
231 fclose(context->ic_fp);
232 return (D_EOF);
233 }
234
235 if (context->ic_offset == 0)
236 set_stream_attributes(item);
237
238 size_t payload_size = calc_payload_size(&item->dp_drr);
239 if (payload_size > UINT32_MAX) {
240 errx(1, "stated packet size is greater than uint32_t"
241 "at offset %llu", (u_longlong_t)context->ic_offset);
242 }
243 item->dp_payload_size = payload_size;
244 if (item->dp_payload_size > 0) {
245 item->dp_payload = safe_malloc(item->dp_payload_size);
246 size_t n_read = fread(item->dp_payload, item->dp_payload_size,
247 1, context->ic_fp);
248 if (n_read != 1) {
249 if (ferror(context->ic_fp)) {
250 err(1, "error reading record payload at "
251 " offset %llu",
252 (u_longlong_t)context->ic_offset);
253 } else {
254 /*
255 * We can't exit here because the ZFS test
256 * suite depends on being able to process
257 * streams truncated at random places.
258 */
259 warnx("input ends mid-record at offset %llu "
260 "- stream is likely corrupt",
261 (u_longlong_t)context->ic_offset);
262 fclose(context->ic_fp);
263 free(item->dp_payload);
264 return (D_EOF);
265 }
266 }
267 } else {
268 item->dp_payload = NULL;
269 }
270 item->dp_stream_offset = context->ic_offset;
271
272 uint32_t drr_type = ATTR_IS_SET(CA_BYTESWAPPED) ?
273 BSWAP_32(drr->drr_type) : drr->drr_type;
274
275 if (drr_type >= DRR_NUMTYPES) {
276 err(1, "invalid record type %llu found at offset %llu",
277 (u_longlong_t)drr_type, (u_longlong_t)context->ic_offset);
278 }
279
280 context->ic_offset += sizeof (*drr) + item->dp_payload_size;
281
282 record_stats_t *stats = &chain_attrs->ca_stats_in[drr_type];
283 stats->rs_num_records++;
284 stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
285 stats->rs_total_payload_bytes += item->dp_payload_size;
286
287 stats = &chain_attrs->ca_totals_in;
288 stats->rs_num_records++;
289 stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
290 stats->rs_total_payload_bytes += item->dp_payload_size;
291
292 return (D_OK);
293 }
294
295 static disposition_t
chain_write(drr_packet_t * item,io_context_t * context)296 chain_write(drr_packet_t *item, io_context_t *context)
297 {
298 if (item == NULL) {
299 if (context->ic_fp) {
300 if (fclose(context->ic_fp) != 0)
301 err(1, "error closing output stream");
302 context->ic_fp = NULL;
303 }
304 return (D_OK);
305 }
306
307 if (!context->ic_fp) {
308 open_file(context);
309 }
310
311 dmu_replay_record_t *drr = &item->dp_drr;
312
313 if (fwrite(drr, sizeof (dmu_replay_record_t), 1, context->ic_fp) != 1) {
314 err(1, "error writing record header");
315 } else if (item->dp_payload_size > 0) {
316 size_t n_written = fwrite(item->dp_payload,
317 item->dp_payload_size, 1, context->ic_fp);
318 if (n_written != 1) {
319 err(1, "error writing payload");
320 } else {
321 free(item->dp_payload);
322 item->dp_payload = NULL;
323 }
324 }
325
326 uint32_t drr_type = OPTION_ENABLED(CA_BYTESWAP_ON_OUTPUT) ?
327 BSWAP_32(drr->drr_type) : drr->drr_type;
328
329 record_stats_t *stats = &chain_attrs->ca_stats_out[drr_type];
330 stats->rs_num_records++;
331 stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
332 stats->rs_total_payload_bytes += item->dp_payload_size;
333
334 stats = &chain_attrs->ca_totals_out;
335 stats->rs_num_records++;
336 stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
337 stats->rs_total_payload_bytes += item->dp_payload_size;
338
339 return (D_OK);
340 }
341
342 /*
343 * Even if the chain doesn't write out a stream, payloads still need freed.
344 */
345 static disposition_t
chain_null_output(drr_packet_t * item,void * context)346 chain_null_output(drr_packet_t *item, void *context)
347 {
348 (void) context;
349 if (item && item->dp_payload != NULL && item->dp_payload_size > 0) {
350 free(item->dp_payload);
351 item->dp_payload = NULL;
352 item->dp_payload_size = 0;
353 }
354 return (D_OK);
355 }
356
357 /*
358 * Storage for the filename must remain valid during chain execution
359 */
360 static chain_step_t
setup_io(const char * filename,boolean_t for_reading)361 setup_io(const char *filename, boolean_t for_reading)
362 {
363 int context_num = next_io_context++ % MAX_IO_STREAMS;
364
365 io_context_t context = {
366 .ic_filename = filename,
367 .ic_for_reading = for_reading
368 };
369 io_contexts[context_num] = context;
370
371 chain_step_t step = {
372 .cs_type = CS_SERIAL,
373 .cs_in_size = 0,
374 .cs_out_size = sizeof (drr_packet_t),
375 .cs_context = &io_contexts[context_num],
376 .cs_serial = {
377 .process = (zc_serial_process_f *)
378 (for_reading ? chain_read : chain_write),
379 }
380 };
381 return (step);
382 }
383
384 chain_step_t
serial_read_stream(const char * filename)385 serial_read_stream(const char *filename)
386 {
387 return (setup_io(filename, B_TRUE));
388 }
389
390 chain_step_t
serial_write_stream(const char * filename)391 serial_write_stream(const char *filename)
392 {
393 return (setup_io(filename, B_FALSE));
394 }
395
396 chain_step_t
serial_null_output(void)397 serial_null_output(void)
398 {
399 chain_step_t step = {
400 .cs_type = CS_SERIAL,
401 .cs_in_size = sizeof (drr_packet_t),
402 .cs_out_size = 0,
403 .cs_context = NULL,
404 .cs_serial = {
405 .process = (zc_serial_process_f *)chain_null_output
406 }
407 };
408 return (step);
409 }
410
411 static disposition_t
chain_checkpoint(drr_packet_t * item,checkpoint_context_t * ctxt)412 chain_checkpoint(drr_packet_t *item, checkpoint_context_t *ctxt)
413 {
414 struct timespec now;
415 char buff[32];
416 uint64_t delta_b, dbdt;
417 double now_sec, delta_t;
418
419 if (item == NULL)
420 return (D_OK);
421
422 clock_gettime(CLOCK_MONOTONIC, &now);
423 now_sec = now.tv_sec + (double)now.tv_nsec / 1E9;
424 if (ctxt->cc_last_sec > 1E-9) {
425 delta_t = now_sec - ctxt->cc_last_sec;
426 if (delta_t < ctxt->cc_period_sec)
427 return (D_OK);
428 delta_b = item->dp_stream_offset - ctxt->cc_last_bytes;
429 dbdt = delta_b / delta_t;
430 zfs_nicenum(dbdt, buff, sizeof (buff));
431 fprintf(stderr, "Checkpoint %s: %s/s\n", ctxt->cc_name, buff);
432 }
433 ctxt->cc_last_sec = now_sec;
434 ctxt->cc_last_bytes = item->dp_stream_offset;
435 return (D_OK);
436 }
437
438 /*
439 * Storage for name must remain valid throughout chain execution
440 */
441 chain_step_t
serial_checkpoint(const char * name)442 serial_checkpoint(const char *name)
443 {
444 int context_no = next_checkpoint_context++ % MAX_IO_STREAMS;
445
446 checkpoint_context_t context = {
447 .cc_name = name,
448 .cc_period_sec = 1.0
449 };
450 checkpoint_contexts[context_no] = context;
451
452 chain_step_t step = {
453 .cs_type = CS_SERIAL,
454 .cs_in_size = sizeof (drr_packet_t),
455 .cs_out_size = sizeof (drr_packet_t),
456 .cs_context = &checkpoint_contexts[context_no],
457 .cs_serial = {
458 .process = (zc_serial_process_f *)chain_checkpoint
459 },
460 };
461 return (step);
462 }
463