1*d0b3ecdcSMartin Matuska // SPDX-License-Identifier: CDDL-1.0
2*d0b3ecdcSMartin Matuska /*
3*d0b3ecdcSMartin Matuska * CDDL HEADER START
4*d0b3ecdcSMartin Matuska *
5*d0b3ecdcSMartin Matuska * This file and its contents are supplied under the terms of the Common
6*d0b3ecdcSMartin Matuska * Development and Distribution License ("CDDL"), version 1.0. You may only use
7*d0b3ecdcSMartin Matuska * this file in accordance with the terms of version 1.0 of the CDDL.
8*d0b3ecdcSMartin Matuska *
9*d0b3ecdcSMartin Matuska * A full copy of the text of the CDDL should have accompanied this source. A
10*d0b3ecdcSMartin Matuska * copy of the CDDL is also available via the Internet at
11*d0b3ecdcSMartin Matuska * http://www.illumos.org/license/CDDL.
12*d0b3ecdcSMartin Matuska *
13*d0b3ecdcSMartin Matuska * CDDL HEADER END
14*d0b3ecdcSMartin Matuska */
15*d0b3ecdcSMartin Matuska
16*d0b3ecdcSMartin Matuska /*
17*d0b3ecdcSMartin Matuska * Copyright (c) 2026 by Garth Snyder. All rights reserved.
18*d0b3ecdcSMartin Matuska */
19*d0b3ecdcSMartin Matuska
20*d0b3ecdcSMartin Matuska #include <arpa/inet.h>
21*d0b3ecdcSMartin Matuska #include <err.h>
22*d0b3ecdcSMartin Matuska #include <errno.h>
23*d0b3ecdcSMartin Matuska #include <libzutil.h>
24*d0b3ecdcSMartin Matuska #include <stdio.h>
25*d0b3ecdcSMartin Matuska #include <stdlib.h>
26*d0b3ecdcSMartin Matuska #include <string.h>
27*d0b3ecdcSMartin Matuska #include <sys/byteorder.h>
28*d0b3ecdcSMartin Matuska #include <sys/stdtypes.h>
29*d0b3ecdcSMartin Matuska #include <sys/sysmacros.h>
30*d0b3ecdcSMartin Matuska #include <sys/types.h>
31*d0b3ecdcSMartin Matuska #include <sys/zfs_ioctl.h>
32*d0b3ecdcSMartin Matuska #include <time.h>
33*d0b3ecdcSMartin Matuska #include <unistd.h>
34*d0b3ecdcSMartin Matuska
35*d0b3ecdcSMartin Matuska #include "zstream_chain.h"
36*d0b3ecdcSMartin Matuska #include "zstream_modules.h"
37*d0b3ecdcSMartin Matuska #include "zstream_util.h"
38*d0b3ecdcSMartin Matuska
39*d0b3ecdcSMartin Matuska /*
40*d0b3ecdcSMartin Matuska * Init only the filename; chain functions will prepare the FILE *
41*d0b3ecdcSMartin Matuska */
42*d0b3ecdcSMartin Matuska typedef struct {
43*d0b3ecdcSMartin Matuska const char *ic_filename;
44*d0b3ecdcSMartin Matuska FILE *ic_fp;
45*d0b3ecdcSMartin Matuska boolean_t ic_for_reading;
46*d0b3ecdcSMartin Matuska off_t ic_offset;
47*d0b3ecdcSMartin Matuska } io_context_t;
48*d0b3ecdcSMartin Matuska
49*d0b3ecdcSMartin Matuska typedef struct {
50*d0b3ecdcSMartin Matuska const char *cc_name;
51*d0b3ecdcSMartin Matuska double cc_last_sec;
52*d0b3ecdcSMartin Matuska double cc_period_sec;
53*d0b3ecdcSMartin Matuska uint64_t cc_last_bytes;
54*d0b3ecdcSMartin Matuska } checkpoint_context_t;
55*d0b3ecdcSMartin Matuska
56*d0b3ecdcSMartin Matuska static io_context_t io_contexts[MAX_IO_STREAMS];
57*d0b3ecdcSMartin Matuska static int next_io_context = 0;
58*d0b3ecdcSMartin Matuska
59*d0b3ecdcSMartin Matuska static checkpoint_context_t checkpoint_contexts[MAX_IO_STREAMS];
60*d0b3ecdcSMartin Matuska static int next_checkpoint_context = 0;
61*d0b3ecdcSMartin Matuska
62*d0b3ecdcSMartin Matuska /*
63*d0b3ecdcSMartin Matuska * Run from within chain execution to initialize I/O. A NULL filename
64*d0b3ecdcSMartin Matuska * indicates stdin or stdout.
65*d0b3ecdcSMartin Matuska */
66*d0b3ecdcSMartin Matuska static void
open_file(io_context_t * context)67*d0b3ecdcSMartin Matuska open_file(io_context_t *context)
68*d0b3ecdcSMartin Matuska {
69*d0b3ecdcSMartin Matuska if (context->ic_filename) {
70*d0b3ecdcSMartin Matuska context->ic_fp = fopen(context->ic_filename,
71*d0b3ecdcSMartin Matuska context->ic_for_reading ? "rb" : "wb+");
72*d0b3ecdcSMartin Matuska if (!context->ic_fp) {
73*d0b3ecdcSMartin Matuska perror(context->ic_filename);
74*d0b3ecdcSMartin Matuska exit(1);
75*d0b3ecdcSMartin Matuska }
76*d0b3ecdcSMartin Matuska } else if (context->ic_for_reading && isatty(STDIN_FILENO)) {
77*d0b3ecdcSMartin Matuska errx(1, "stream cannot be read from a terminal. "
78*d0b3ecdcSMartin Matuska "Name a file or take input from a pipe.");
79*d0b3ecdcSMartin Matuska } else if (context->ic_for_reading) {
80*d0b3ecdcSMartin Matuska context->ic_fp = stdin;
81*d0b3ecdcSMartin Matuska } else if (isatty(STDOUT_FILENO)) {
82*d0b3ecdcSMartin Matuska errx(1, "stream cannot be written to a terminal. "
83*d0b3ecdcSMartin Matuska "Capture output to a file or pipe to another command.");
84*d0b3ecdcSMartin Matuska } else {
85*d0b3ecdcSMartin Matuska context->ic_fp = stdout;
86*d0b3ecdcSMartin Matuska }
87*d0b3ecdcSMartin Matuska }
88*d0b3ecdcSMartin Matuska
89*d0b3ecdcSMartin Matuska /*
90*d0b3ecdcSMartin Matuska * Extract the payload size from a replay record that is potentially
91*d0b3ecdcSMartin Matuska * byteswapped. We want to leave the bulk of byteswapping to another module,
92*d0b3ecdcSMartin Matuska * so just take a quick, nondestructive peek.
93*d0b3ecdcSMartin Matuska *
94*d0b3ecdcSMartin Matuska * Record-specific macros such as DRR_WRITE_PAYLOAD_SIZE do not seem to be
95*d0b3ecdcSMartin Matuska * byteswap-aware. However, with the exception of DRR_OBJECT_PAYLOAD_SIZE,
96*d0b3ecdcSMartin Matuska * they happen to work with post-swapping since they are switching on either
97*d0b3ecdcSMartin Matuska * a uint8_t value or 0.
98*d0b3ecdcSMartin Matuska *
99*d0b3ecdcSMartin Matuska * DRR_WRITE and DRR_SPILL use 64-bit sizes. The other two record types have
100*d0b3ecdcSMartin Matuska * 32-bit sizes. The drr_payloadlen field shared by all record types (but
101*d0b3ecdcSMartin Matuska * used only by BEGIN records is also 32 bits.
102*d0b3ecdcSMartin Matuska */
103*d0b3ecdcSMartin Matuska static size_t
calc_payload_size(dmu_replay_record_t * drr)104*d0b3ecdcSMartin Matuska calc_payload_size(dmu_replay_record_t *drr)
105*d0b3ecdcSMartin Matuska {
106*d0b3ecdcSMartin Matuska struct drr_object *drro = &drr->drr_u.drr_object;
107*d0b3ecdcSMartin Matuska struct drr_write *drrw = &drr->drr_u.drr_write;
108*d0b3ecdcSMartin Matuska struct drr_spill *drrs = &drr->drr_u.drr_spill;
109*d0b3ecdcSMartin Matuska struct drr_write_embedded *drrwe = &drr->drr_u.drr_write_embedded;
110*d0b3ecdcSMartin Matuska
111*d0b3ecdcSMartin Matuska boolean_t swap = ATTR_IS_SET(CA_BYTESWAPPED);
112*d0b3ecdcSMartin Matuska uint32_t drr_type = swap ? BSWAP_32(drr->drr_type) : drr->drr_type;
113*d0b3ecdcSMartin Matuska uint64_t size, size64 = 0;
114*d0b3ecdcSMartin Matuska uint32_t size32 = 0;
115*d0b3ecdcSMartin Matuska boolean_t round = B_FALSE;
116*d0b3ecdcSMartin Matuska
117*d0b3ecdcSMartin Matuska if (drr_type == DRR_OBJECT) {
118*d0b3ecdcSMartin Matuska round = drro->drr_raw_bonuslen == 0;
119*d0b3ecdcSMartin Matuska size32 = round ? drro->drr_bonuslen : drro->drr_raw_bonuslen;
120*d0b3ecdcSMartin Matuska } else if (drr_type == DRR_WRITE) {
121*d0b3ecdcSMartin Matuska size64 = DRR_WRITE_PAYLOAD_SIZE(drrw);
122*d0b3ecdcSMartin Matuska } else if (drr_type == DRR_SPILL) {
123*d0b3ecdcSMartin Matuska size64 = DRR_SPILL_PAYLOAD_SIZE(drrs);
124*d0b3ecdcSMartin Matuska } else if (drr_type == DRR_WRITE_EMBEDDED) {
125*d0b3ecdcSMartin Matuska size32 = drrwe->drr_psize;
126*d0b3ecdcSMartin Matuska round = B_TRUE;
127*d0b3ecdcSMartin Matuska } else if (drr_type == DRR_BEGIN) {
128*d0b3ecdcSMartin Matuska size32 = drr->drr_payloadlen;
129*d0b3ecdcSMartin Matuska } else {
130*d0b3ecdcSMartin Matuska return (0);
131*d0b3ecdcSMartin Matuska }
132*d0b3ecdcSMartin Matuska if (size32 != 0) {
133*d0b3ecdcSMartin Matuska size = swap ? BSWAP_32(size32) : size32;
134*d0b3ecdcSMartin Matuska } else {
135*d0b3ecdcSMartin Matuska size = swap ? BSWAP_64(size64) : size64;
136*d0b3ecdcSMartin Matuska }
137*d0b3ecdcSMartin Matuska return (round ? P2ROUNDUP(size, 8) : size);
138*d0b3ecdcSMartin Matuska }
139*d0b3ecdcSMartin Matuska
140*d0b3ecdcSMartin Matuska /*
141*d0b3ecdcSMartin Matuska * Must be called only with the first record in a stream. Must be a
142*d0b3ecdcSMartin Matuska * DRR_BEGIN record or we'll terminate with "invalid stream".
143*d0b3ecdcSMartin Matuska */
144*d0b3ecdcSMartin Matuska static void
set_stream_attributes(drr_packet_t * item)145*d0b3ecdcSMartin Matuska set_stream_attributes(drr_packet_t *item)
146*d0b3ecdcSMartin Matuska {
147*d0b3ecdcSMartin Matuska dmu_replay_record_t *drr = &item->dp_drr;
148*d0b3ecdcSMartin Matuska struct drr_begin *drrb = &drr->drr_u.drr_begin;
149*d0b3ecdcSMartin Matuska uint64_t magic = drrb->drr_magic;
150*d0b3ecdcSMartin Matuska uint64_t versioninfo = drrb->drr_versioninfo;
151*d0b3ecdcSMartin Matuska boolean_t i_am_big_endian = htonl(0xFF00) == 0xFF00;
152*d0b3ecdcSMartin Matuska
153*d0b3ecdcSMartin Matuska boolean_t swap_on_output, is_deduped;
154*d0b3ecdcSMartin Matuska
155*d0b3ecdcSMartin Matuska if (magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
156*d0b3ecdcSMartin Matuska SET_ATTR(CA_BYTESWAPPED);
157*d0b3ecdcSMartin Matuska versioninfo = BSWAP_64(versioninfo);
158*d0b3ecdcSMartin Matuska } else if (magic != DMU_BACKUP_MAGIC) {
159*d0b3ecdcSMartin Matuska errx(1, "invalid ZFS stream, bad magic number %llx",
160*d0b3ecdcSMartin Matuska (u_longlong_t)magic);
161*d0b3ecdcSMartin Matuska }
162*d0b3ecdcSMartin Matuska if (i_am_big_endian == ATTR_IS_SET(CA_BYTESWAPPED)) {
163*d0b3ecdcSMartin Matuska SET_ATTR(CA_LITTLE_ENDIAN_INPUT);
164*d0b3ecdcSMartin Matuska } else {
165*d0b3ecdcSMartin Matuska SET_ATTR(CA_BIG_ENDIAN_INPUT);
166*d0b3ecdcSMartin Matuska }
167*d0b3ecdcSMartin Matuska chain_attrs->ca_feature_flags = DMU_GET_FEATUREFLAGS(versioninfo);
168*d0b3ecdcSMartin Matuska
169*d0b3ecdcSMartin Matuska is_deduped =
170*d0b3ecdcSMartin Matuska STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUP) ||
171*d0b3ecdcSMartin Matuska STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUPPROPS);
172*d0b3ecdcSMartin Matuska
173*d0b3ecdcSMartin Matuska if (OPTION_ENABLED(CA_FORBID_DEDUP) && is_deduped) {
174*d0b3ecdcSMartin Matuska errx(1, "input stream is deduplicated, but this subcommand "
175*d0b3ecdcSMartin Matuska "does not support deduplicated streams. Use 'zstream "
176*d0b3ecdcSMartin Matuska "redup' to reduplicate.");
177*d0b3ecdcSMartin Matuska }
178*d0b3ecdcSMartin Matuska boolean_t req_dedup = OPTION_ENABLED(CA_REQUIRE_DEDUP);
179*d0b3ecdcSMartin Matuska boolean_t is_dedup = STREAM_HAS_FEATURE(DMU_BACKUP_FEATURE_DEDUP);
180*d0b3ecdcSMartin Matuska if (req_dedup && !is_dedup) {
181*d0b3ecdcSMartin Matuska errx(1, "this subcommand requires a deduplicated input "
182*d0b3ecdcSMartin Matuska "stream, but the stream is not deduplicated");
183*d0b3ecdcSMartin Matuska }
184*d0b3ecdcSMartin Matuska boolean_t req_native = OPTION_ENABLED(CA_REQUIRE_NATIVE_ENDIAN);
185*d0b3ecdcSMartin Matuska boolean_t is_byteswapped = ATTR_IS_SET(CA_BYTESWAPPED);
186*d0b3ecdcSMartin Matuska if (req_native && is_byteswapped) {
187*d0b3ecdcSMartin Matuska errx(1, "this subcommand requires a native-endian "
188*d0b3ecdcSMartin Matuska "input stream");
189*d0b3ecdcSMartin Matuska }
190*d0b3ecdcSMartin Matuska
191*d0b3ecdcSMartin Matuska /*
192*d0b3ecdcSMartin Matuska * Figure out output endianness. In the absence of explicit byte
193*d0b3ecdcSMartin Matuska * order instructions, we default to preserving the input byte
194*d0b3ecdcSMartin Matuska * order. Record headers are always converted to native byte order
195*d0b3ecdcSMartin Matuska * for processing, but they can be swapped back on output.
196*d0b3ecdcSMartin Matuska *
197*d0b3ecdcSMartin Matuska * zfs receive inspects the endianness of each DRR record
198*d0b3ecdcSMartin Matuska * and assumes, at least in some cases, that payload data has the
199*d0b3ecdcSMartin Matuska * same order as the DMU wrappers.
200*d0b3ecdcSMartin Matuska */
201*d0b3ecdcSMartin Matuska if (OPTION_ENABLED(CA_BIG_ENDIAN_OUT))
202*d0b3ecdcSMartin Matuska swap_on_output = !i_am_big_endian;
203*d0b3ecdcSMartin Matuska else if (OPTION_ENABLED(CA_LITTLE_ENDIAN_OUT))
204*d0b3ecdcSMartin Matuska swap_on_output = i_am_big_endian;
205*d0b3ecdcSMartin Matuska else if (OPTION_ENABLED(CA_OPPOSITE_ENDIAN_OUT))
206*d0b3ecdcSMartin Matuska swap_on_output = !ATTR_IS_SET(CA_BYTESWAPPED);
207*d0b3ecdcSMartin Matuska else
208*d0b3ecdcSMartin Matuska swap_on_output = ATTR_IS_SET(CA_BYTESWAPPED);
209*d0b3ecdcSMartin Matuska
210*d0b3ecdcSMartin Matuska if (swap_on_output) {
211*d0b3ecdcSMartin Matuska ENABLE_OPTION(chain_attrs, CA_BYTESWAP_ON_OUTPUT);
212*d0b3ecdcSMartin Matuska }
213*d0b3ecdcSMartin Matuska }
214*d0b3ecdcSMartin Matuska
215*d0b3ecdcSMartin Matuska static disposition_t
chain_read(drr_packet_t * item,io_context_t * context)216*d0b3ecdcSMartin Matuska chain_read(drr_packet_t *item, io_context_t *context)
217*d0b3ecdcSMartin Matuska {
218*d0b3ecdcSMartin Matuska if (item == NULL)
219*d0b3ecdcSMartin Matuska return (D_OK);
220*d0b3ecdcSMartin Matuska
221*d0b3ecdcSMartin Matuska dmu_replay_record_t *drr = &item->dp_drr;
222*d0b3ecdcSMartin Matuska
223*d0b3ecdcSMartin Matuska if (!context->ic_fp)
224*d0b3ecdcSMartin Matuska open_file(context);
225*d0b3ecdcSMartin Matuska
226*d0b3ecdcSMartin Matuska if (fread(drr, sizeof (dmu_replay_record_t), 1, context->ic_fp) != 1) {
227*d0b3ecdcSMartin Matuska if (ferror(context->ic_fp)) {
228*d0b3ecdcSMartin Matuska err(1, "error reading record header at offset %llu",
229*d0b3ecdcSMartin Matuska (u_longlong_t)context->ic_offset);
230*d0b3ecdcSMartin Matuska }
231*d0b3ecdcSMartin Matuska fclose(context->ic_fp);
232*d0b3ecdcSMartin Matuska return (D_EOF);
233*d0b3ecdcSMartin Matuska }
234*d0b3ecdcSMartin Matuska
235*d0b3ecdcSMartin Matuska if (context->ic_offset == 0)
236*d0b3ecdcSMartin Matuska set_stream_attributes(item);
237*d0b3ecdcSMartin Matuska
238*d0b3ecdcSMartin Matuska size_t payload_size = calc_payload_size(&item->dp_drr);
239*d0b3ecdcSMartin Matuska if (payload_size > UINT32_MAX) {
240*d0b3ecdcSMartin Matuska errx(1, "stated packet size is greater than uint32_t"
241*d0b3ecdcSMartin Matuska "at offset %llu", (u_longlong_t)context->ic_offset);
242*d0b3ecdcSMartin Matuska }
243*d0b3ecdcSMartin Matuska item->dp_payload_size = payload_size;
244*d0b3ecdcSMartin Matuska if (item->dp_payload_size > 0) {
245*d0b3ecdcSMartin Matuska item->dp_payload = safe_malloc(item->dp_payload_size);
246*d0b3ecdcSMartin Matuska size_t n_read = fread(item->dp_payload, item->dp_payload_size,
247*d0b3ecdcSMartin Matuska 1, context->ic_fp);
248*d0b3ecdcSMartin Matuska if (n_read != 1) {
249*d0b3ecdcSMartin Matuska if (ferror(context->ic_fp)) {
250*d0b3ecdcSMartin Matuska err(1, "error reading record payload at "
251*d0b3ecdcSMartin Matuska " offset %llu",
252*d0b3ecdcSMartin Matuska (u_longlong_t)context->ic_offset);
253*d0b3ecdcSMartin Matuska } else {
254*d0b3ecdcSMartin Matuska /*
255*d0b3ecdcSMartin Matuska * We can't exit here because the ZFS test
256*d0b3ecdcSMartin Matuska * suite depends on being able to process
257*d0b3ecdcSMartin Matuska * streams truncated at random places.
258*d0b3ecdcSMartin Matuska */
259*d0b3ecdcSMartin Matuska warnx("input ends mid-record at offset %llu "
260*d0b3ecdcSMartin Matuska "- stream is likely corrupt",
261*d0b3ecdcSMartin Matuska (u_longlong_t)context->ic_offset);
262*d0b3ecdcSMartin Matuska fclose(context->ic_fp);
263*d0b3ecdcSMartin Matuska free(item->dp_payload);
264*d0b3ecdcSMartin Matuska return (D_EOF);
265*d0b3ecdcSMartin Matuska }
266*d0b3ecdcSMartin Matuska }
267*d0b3ecdcSMartin Matuska } else {
268*d0b3ecdcSMartin Matuska item->dp_payload = NULL;
269*d0b3ecdcSMartin Matuska }
270*d0b3ecdcSMartin Matuska item->dp_stream_offset = context->ic_offset;
271*d0b3ecdcSMartin Matuska
272*d0b3ecdcSMartin Matuska uint32_t drr_type = ATTR_IS_SET(CA_BYTESWAPPED) ?
273*d0b3ecdcSMartin Matuska BSWAP_32(drr->drr_type) : drr->drr_type;
274*d0b3ecdcSMartin Matuska
275*d0b3ecdcSMartin Matuska if (drr_type >= DRR_NUMTYPES) {
276*d0b3ecdcSMartin Matuska err(1, "invalid record type %llu found at offset %llu",
277*d0b3ecdcSMartin Matuska (u_longlong_t)drr_type, (u_longlong_t)context->ic_offset);
278*d0b3ecdcSMartin Matuska }
279*d0b3ecdcSMartin Matuska
280*d0b3ecdcSMartin Matuska context->ic_offset += sizeof (*drr) + item->dp_payload_size;
281*d0b3ecdcSMartin Matuska
282*d0b3ecdcSMartin Matuska record_stats_t *stats = &chain_attrs->ca_stats_in[drr_type];
283*d0b3ecdcSMartin Matuska stats->rs_num_records++;
284*d0b3ecdcSMartin Matuska stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
285*d0b3ecdcSMartin Matuska stats->rs_total_payload_bytes += item->dp_payload_size;
286*d0b3ecdcSMartin Matuska
287*d0b3ecdcSMartin Matuska stats = &chain_attrs->ca_totals_in;
288*d0b3ecdcSMartin Matuska stats->rs_num_records++;
289*d0b3ecdcSMartin Matuska stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
290*d0b3ecdcSMartin Matuska stats->rs_total_payload_bytes += item->dp_payload_size;
291*d0b3ecdcSMartin Matuska
292*d0b3ecdcSMartin Matuska return (D_OK);
293*d0b3ecdcSMartin Matuska }
294*d0b3ecdcSMartin Matuska
295*d0b3ecdcSMartin Matuska static disposition_t
chain_write(drr_packet_t * item,io_context_t * context)296*d0b3ecdcSMartin Matuska chain_write(drr_packet_t *item, io_context_t *context)
297*d0b3ecdcSMartin Matuska {
298*d0b3ecdcSMartin Matuska if (item == NULL) {
299*d0b3ecdcSMartin Matuska if (context->ic_fp) {
300*d0b3ecdcSMartin Matuska if (fclose(context->ic_fp) != 0)
301*d0b3ecdcSMartin Matuska err(1, "error closing output stream");
302*d0b3ecdcSMartin Matuska context->ic_fp = NULL;
303*d0b3ecdcSMartin Matuska }
304*d0b3ecdcSMartin Matuska return (D_OK);
305*d0b3ecdcSMartin Matuska }
306*d0b3ecdcSMartin Matuska
307*d0b3ecdcSMartin Matuska if (!context->ic_fp) {
308*d0b3ecdcSMartin Matuska open_file(context);
309*d0b3ecdcSMartin Matuska }
310*d0b3ecdcSMartin Matuska
311*d0b3ecdcSMartin Matuska dmu_replay_record_t *drr = &item->dp_drr;
312*d0b3ecdcSMartin Matuska
313*d0b3ecdcSMartin Matuska if (fwrite(drr, sizeof (dmu_replay_record_t), 1, context->ic_fp) != 1) {
314*d0b3ecdcSMartin Matuska err(1, "error writing record header");
315*d0b3ecdcSMartin Matuska } else if (item->dp_payload_size > 0) {
316*d0b3ecdcSMartin Matuska size_t n_written = fwrite(item->dp_payload,
317*d0b3ecdcSMartin Matuska item->dp_payload_size, 1, context->ic_fp);
318*d0b3ecdcSMartin Matuska if (n_written != 1) {
319*d0b3ecdcSMartin Matuska err(1, "error writing payload");
320*d0b3ecdcSMartin Matuska } else {
321*d0b3ecdcSMartin Matuska free(item->dp_payload);
322*d0b3ecdcSMartin Matuska item->dp_payload = NULL;
323*d0b3ecdcSMartin Matuska }
324*d0b3ecdcSMartin Matuska }
325*d0b3ecdcSMartin Matuska
326*d0b3ecdcSMartin Matuska uint32_t drr_type = OPTION_ENABLED(CA_BYTESWAP_ON_OUTPUT) ?
327*d0b3ecdcSMartin Matuska BSWAP_32(drr->drr_type) : drr->drr_type;
328*d0b3ecdcSMartin Matuska
329*d0b3ecdcSMartin Matuska record_stats_t *stats = &chain_attrs->ca_stats_out[drr_type];
330*d0b3ecdcSMartin Matuska stats->rs_num_records++;
331*d0b3ecdcSMartin Matuska stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
332*d0b3ecdcSMartin Matuska stats->rs_total_payload_bytes += item->dp_payload_size;
333*d0b3ecdcSMartin Matuska
334*d0b3ecdcSMartin Matuska stats = &chain_attrs->ca_totals_out;
335*d0b3ecdcSMartin Matuska stats->rs_num_records++;
336*d0b3ecdcSMartin Matuska stats->rs_total_header_bytes += sizeof (dmu_replay_record_t);
337*d0b3ecdcSMartin Matuska stats->rs_total_payload_bytes += item->dp_payload_size;
338*d0b3ecdcSMartin Matuska
339*d0b3ecdcSMartin Matuska return (D_OK);
340*d0b3ecdcSMartin Matuska }
341*d0b3ecdcSMartin Matuska
342*d0b3ecdcSMartin Matuska /*
343*d0b3ecdcSMartin Matuska * Even if the chain doesn't write out a stream, payloads still need freed.
344*d0b3ecdcSMartin Matuska */
345*d0b3ecdcSMartin Matuska static disposition_t
chain_null_output(drr_packet_t * item,void * context)346*d0b3ecdcSMartin Matuska chain_null_output(drr_packet_t *item, void *context)
347*d0b3ecdcSMartin Matuska {
348*d0b3ecdcSMartin Matuska (void) context;
349*d0b3ecdcSMartin Matuska if (item && item->dp_payload != NULL && item->dp_payload_size > 0) {
350*d0b3ecdcSMartin Matuska free(item->dp_payload);
351*d0b3ecdcSMartin Matuska item->dp_payload = NULL;
352*d0b3ecdcSMartin Matuska item->dp_payload_size = 0;
353*d0b3ecdcSMartin Matuska }
354*d0b3ecdcSMartin Matuska return (D_OK);
355*d0b3ecdcSMartin Matuska }
356*d0b3ecdcSMartin Matuska
357*d0b3ecdcSMartin Matuska /*
358*d0b3ecdcSMartin Matuska * Storage for the filename must remain valid during chain execution
359*d0b3ecdcSMartin Matuska */
360*d0b3ecdcSMartin Matuska static chain_step_t
setup_io(const char * filename,boolean_t for_reading)361*d0b3ecdcSMartin Matuska setup_io(const char *filename, boolean_t for_reading)
362*d0b3ecdcSMartin Matuska {
363*d0b3ecdcSMartin Matuska int context_num = next_io_context++ % MAX_IO_STREAMS;
364*d0b3ecdcSMartin Matuska
365*d0b3ecdcSMartin Matuska io_context_t context = {
366*d0b3ecdcSMartin Matuska .ic_filename = filename,
367*d0b3ecdcSMartin Matuska .ic_for_reading = for_reading
368*d0b3ecdcSMartin Matuska };
369*d0b3ecdcSMartin Matuska io_contexts[context_num] = context;
370*d0b3ecdcSMartin Matuska
371*d0b3ecdcSMartin Matuska chain_step_t step = {
372*d0b3ecdcSMartin Matuska .cs_type = CS_SERIAL,
373*d0b3ecdcSMartin Matuska .cs_in_size = 0,
374*d0b3ecdcSMartin Matuska .cs_out_size = sizeof (drr_packet_t),
375*d0b3ecdcSMartin Matuska .cs_context = &io_contexts[context_num],
376*d0b3ecdcSMartin Matuska .cs_serial = {
377*d0b3ecdcSMartin Matuska .process = (zc_serial_process_f *)
378*d0b3ecdcSMartin Matuska (for_reading ? chain_read : chain_write),
379*d0b3ecdcSMartin Matuska }
380*d0b3ecdcSMartin Matuska };
381*d0b3ecdcSMartin Matuska return (step);
382*d0b3ecdcSMartin Matuska }
383*d0b3ecdcSMartin Matuska
384*d0b3ecdcSMartin Matuska chain_step_t
serial_read_stream(const char * filename)385*d0b3ecdcSMartin Matuska serial_read_stream(const char *filename)
386*d0b3ecdcSMartin Matuska {
387*d0b3ecdcSMartin Matuska return (setup_io(filename, B_TRUE));
388*d0b3ecdcSMartin Matuska }
389*d0b3ecdcSMartin Matuska
390*d0b3ecdcSMartin Matuska chain_step_t
serial_write_stream(const char * filename)391*d0b3ecdcSMartin Matuska serial_write_stream(const char *filename)
392*d0b3ecdcSMartin Matuska {
393*d0b3ecdcSMartin Matuska return (setup_io(filename, B_FALSE));
394*d0b3ecdcSMartin Matuska }
395*d0b3ecdcSMartin Matuska
396*d0b3ecdcSMartin Matuska chain_step_t
serial_null_output(void)397*d0b3ecdcSMartin Matuska serial_null_output(void)
398*d0b3ecdcSMartin Matuska {
399*d0b3ecdcSMartin Matuska chain_step_t step = {
400*d0b3ecdcSMartin Matuska .cs_type = CS_SERIAL,
401*d0b3ecdcSMartin Matuska .cs_in_size = sizeof (drr_packet_t),
402*d0b3ecdcSMartin Matuska .cs_out_size = 0,
403*d0b3ecdcSMartin Matuska .cs_context = NULL,
404*d0b3ecdcSMartin Matuska .cs_serial = {
405*d0b3ecdcSMartin Matuska .process = (zc_serial_process_f *)chain_null_output
406*d0b3ecdcSMartin Matuska }
407*d0b3ecdcSMartin Matuska };
408*d0b3ecdcSMartin Matuska return (step);
409*d0b3ecdcSMartin Matuska }
410*d0b3ecdcSMartin Matuska
411*d0b3ecdcSMartin Matuska static disposition_t
chain_checkpoint(drr_packet_t * item,checkpoint_context_t * ctxt)412*d0b3ecdcSMartin Matuska chain_checkpoint(drr_packet_t *item, checkpoint_context_t *ctxt)
413*d0b3ecdcSMartin Matuska {
414*d0b3ecdcSMartin Matuska struct timespec now;
415*d0b3ecdcSMartin Matuska char buff[32];
416*d0b3ecdcSMartin Matuska uint64_t delta_b, dbdt;
417*d0b3ecdcSMartin Matuska double now_sec, delta_t;
418*d0b3ecdcSMartin Matuska
419*d0b3ecdcSMartin Matuska if (item == NULL)
420*d0b3ecdcSMartin Matuska return (D_OK);
421*d0b3ecdcSMartin Matuska
422*d0b3ecdcSMartin Matuska clock_gettime(CLOCK_MONOTONIC, &now);
423*d0b3ecdcSMartin Matuska now_sec = now.tv_sec + (double)now.tv_nsec / 1E9;
424*d0b3ecdcSMartin Matuska if (ctxt->cc_last_sec > 1E-9) {
425*d0b3ecdcSMartin Matuska delta_t = now_sec - ctxt->cc_last_sec;
426*d0b3ecdcSMartin Matuska if (delta_t < ctxt->cc_period_sec)
427*d0b3ecdcSMartin Matuska return (D_OK);
428*d0b3ecdcSMartin Matuska delta_b = item->dp_stream_offset - ctxt->cc_last_bytes;
429*d0b3ecdcSMartin Matuska dbdt = delta_b / delta_t;
430*d0b3ecdcSMartin Matuska zfs_nicenum(dbdt, buff, sizeof (buff));
431*d0b3ecdcSMartin Matuska fprintf(stderr, "Checkpoint %s: %s/s\n", ctxt->cc_name, buff);
432*d0b3ecdcSMartin Matuska }
433*d0b3ecdcSMartin Matuska ctxt->cc_last_sec = now_sec;
434*d0b3ecdcSMartin Matuska ctxt->cc_last_bytes = item->dp_stream_offset;
435*d0b3ecdcSMartin Matuska return (D_OK);
436*d0b3ecdcSMartin Matuska }
437*d0b3ecdcSMartin Matuska
438*d0b3ecdcSMartin Matuska /*
439*d0b3ecdcSMartin Matuska * Storage for name must remain valid throughout chain execution
440*d0b3ecdcSMartin Matuska */
441*d0b3ecdcSMartin Matuska chain_step_t
serial_checkpoint(const char * name)442*d0b3ecdcSMartin Matuska serial_checkpoint(const char *name)
443*d0b3ecdcSMartin Matuska {
444*d0b3ecdcSMartin Matuska int context_no = next_checkpoint_context++ % MAX_IO_STREAMS;
445*d0b3ecdcSMartin Matuska
446*d0b3ecdcSMartin Matuska checkpoint_context_t context = {
447*d0b3ecdcSMartin Matuska .cc_name = name,
448*d0b3ecdcSMartin Matuska .cc_period_sec = 1.0
449*d0b3ecdcSMartin Matuska };
450*d0b3ecdcSMartin Matuska checkpoint_contexts[context_no] = context;
451*d0b3ecdcSMartin Matuska
452*d0b3ecdcSMartin Matuska chain_step_t step = {
453*d0b3ecdcSMartin Matuska .cs_type = CS_SERIAL,
454*d0b3ecdcSMartin Matuska .cs_in_size = sizeof (drr_packet_t),
455*d0b3ecdcSMartin Matuska .cs_out_size = sizeof (drr_packet_t),
456*d0b3ecdcSMartin Matuska .cs_context = &checkpoint_contexts[context_no],
457*d0b3ecdcSMartin Matuska .cs_serial = {
458*d0b3ecdcSMartin Matuska .process = (zc_serial_process_f *)chain_checkpoint
459*d0b3ecdcSMartin Matuska },
460*d0b3ecdcSMartin Matuska };
461*d0b3ecdcSMartin Matuska return (step);
462*d0b3ecdcSMartin Matuska }
463