1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or https://opensource.org/licenses/CDDL-1.0. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2022 Axcient. All rights reserved. 24 * Use is subject to license terms. 25 * 26 * Copyright (c) 2022 by Delphix. All rights reserved. 27 * Copyright (c) 2024, Klara, Inc. 28 */ 29 30 #include <err.h> 31 #include <stdio.h> 32 #include <stdlib.h> 33 #include <unistd.h> 34 #include <sys/zfs_ioctl.h> 35 #include <sys/zio_checksum.h> 36 #include <sys/zstd/zstd.h> 37 #include "zfs_fletcher.h" 38 #include "zstream.h" 39 40 static int 41 dump_record(dmu_replay_record_t *drr, void *payload, int payload_len, 42 zio_cksum_t *zc, int outfd) 43 { 44 assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum) 45 == sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 46 fletcher_4_incremental_native(drr, 47 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc); 48 if (drr->drr_type != DRR_BEGIN) { 49 assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u. 50 drr_checksum.drr_checksum)); 51 drr->drr_u.drr_checksum.drr_checksum = *zc; 52 } 53 fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum, 54 sizeof (zio_cksum_t), zc); 55 if (write(outfd, drr, sizeof (*drr)) == -1) 56 return (errno); 57 if (payload_len != 0) { 58 fletcher_4_incremental_native(payload, payload_len, zc); 59 if (write(outfd, payload, payload_len) == -1) 60 return (errno); 61 } 62 return (0); 63 } 64 65 int 66 zstream_do_recompress(int argc, char *argv[]) 67 { 68 int bufsz = SPA_MAXBLOCKSIZE; 69 char *buf = safe_malloc(bufsz); 70 dmu_replay_record_t thedrr; 71 dmu_replay_record_t *drr = &thedrr; 72 zio_cksum_t stream_cksum; 73 int c; 74 int level = 0; 75 76 while ((c = getopt(argc, argv, "l:")) != -1) { 77 switch (c) { 78 case 'l': 79 if (sscanf(optarg, "%d", &level) != 1) { 80 fprintf(stderr, 81 "failed to parse level '%s'\n", 82 optarg); 83 zstream_usage(); 84 } 85 break; 86 case '?': 87 (void) fprintf(stderr, "invalid option '%c'\n", 88 optopt); 89 zstream_usage(); 90 break; 91 } 92 } 93 94 argc -= optind; 95 argv += optind; 96 97 if (argc != 1) 98 zstream_usage(); 99 100 enum zio_compress ctype; 101 if (strcmp(argv[0], "off") == 0) { 102 ctype = ZIO_COMPRESS_OFF; 103 } else { 104 for (ctype = 0; ctype < ZIO_COMPRESS_FUNCTIONS; ctype++) { 105 if (strcmp(argv[0], 106 zio_compress_table[ctype].ci_name) == 0) 107 break; 108 } 109 if (ctype == ZIO_COMPRESS_FUNCTIONS || 110 zio_compress_table[ctype].ci_compress == NULL) { 111 fprintf(stderr, "Invalid compression type %s.\n", 112 argv[0]); 113 exit(2); 114 } 115 } 116 117 if (isatty(STDIN_FILENO)) { 118 (void) fprintf(stderr, 119 "Error: The send stream is a binary format " 120 "and can not be read from a\n" 121 "terminal. Standard input must be redirected.\n"); 122 exit(1); 123 } 124 125 abd_init(); 126 fletcher_4_init(); 127 zio_init(); 128 zstd_init(); 129 int begin = 0; 130 boolean_t seen = B_FALSE; 131 while (sfread(drr, sizeof (*drr), stdin) != 0) { 132 struct drr_write *drrw; 133 uint64_t payload_size = 0; 134 135 /* 136 * We need to regenerate the checksum. 137 */ 138 if (drr->drr_type != DRR_BEGIN) { 139 memset(&drr->drr_u.drr_checksum.drr_checksum, 0, 140 sizeof (drr->drr_u.drr_checksum.drr_checksum)); 141 } 142 143 144 switch (drr->drr_type) { 145 case DRR_BEGIN: 146 { 147 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0); 148 VERIFY0(begin++); 149 seen = B_TRUE; 150 151 uint32_t sz = drr->drr_payloadlen; 152 153 VERIFY3U(sz, <=, 1U << 28); 154 155 if (sz != 0) { 156 if (sz > bufsz) { 157 buf = realloc(buf, sz); 158 if (buf == NULL) 159 err(1, "realloc"); 160 bufsz = sz; 161 } 162 (void) sfread(buf, sz, stdin); 163 } 164 payload_size = sz; 165 break; 166 } 167 case DRR_END: 168 { 169 struct drr_end *drre = &drr->drr_u.drr_end; 170 /* 171 * We would prefer to just check --begin == 0, but 172 * replication streams have an end of stream END 173 * record, so we must avoid tripping it. 174 */ 175 VERIFY3B(seen, ==, B_TRUE); 176 begin--; 177 /* 178 * Use the recalculated checksum, unless this is 179 * the END record of a stream package, which has 180 * no checksum. 181 */ 182 if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum)) 183 drre->drr_checksum = stream_cksum; 184 break; 185 } 186 187 case DRR_OBJECT: 188 { 189 struct drr_object *drro = &drr->drr_u.drr_object; 190 VERIFY3S(begin, ==, 1); 191 192 if (drro->drr_bonuslen > 0) { 193 payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro); 194 (void) sfread(buf, payload_size, stdin); 195 } 196 break; 197 } 198 199 case DRR_SPILL: 200 { 201 struct drr_spill *drrs = &drr->drr_u.drr_spill; 202 VERIFY3S(begin, ==, 1); 203 payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs); 204 (void) sfread(buf, payload_size, stdin); 205 break; 206 } 207 208 case DRR_WRITE_BYREF: 209 VERIFY3S(begin, ==, 1); 210 fprintf(stderr, 211 "Deduplicated streams are not supported\n"); 212 exit(1); 213 break; 214 215 case DRR_WRITE: 216 { 217 VERIFY3S(begin, ==, 1); 218 drrw = &thedrr.drr_u.drr_write; 219 payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw); 220 /* 221 * In order to recompress an encrypted block, you have 222 * to decrypt, decompress, recompress, and 223 * re-encrypt. That can be a future enhancement (along 224 * with decryption or re-encryption), but for now we 225 * skip encrypted blocks. 226 */ 227 boolean_t encrypted = B_FALSE; 228 for (int i = 0; i < ZIO_DATA_SALT_LEN; i++) { 229 if (drrw->drr_salt[i] != 0) { 230 encrypted = B_TRUE; 231 break; 232 } 233 } 234 if (encrypted) { 235 (void) sfread(buf, payload_size, stdin); 236 break; 237 } 238 enum zio_compress dtype = drrw->drr_compressiontype; 239 if (dtype >= ZIO_COMPRESS_FUNCTIONS) { 240 fprintf(stderr, "Invalid compression type in " 241 "stream: %d\n", dtype); 242 exit(3); 243 } 244 if (zio_compress_table[dtype].ci_decompress == NULL) 245 dtype = ZIO_COMPRESS_OFF; 246 247 /* Set up buffers to minimize memcpys */ 248 char *cbuf, *dbuf; 249 if (ctype == ZIO_COMPRESS_OFF) 250 dbuf = buf; 251 else 252 dbuf = safe_calloc(bufsz); 253 254 if (dtype == ZIO_COMPRESS_OFF) 255 cbuf = dbuf; 256 else 257 cbuf = safe_calloc(payload_size); 258 259 /* Read and decompress the payload */ 260 (void) sfread(cbuf, payload_size, stdin); 261 if (dtype != ZIO_COMPRESS_OFF) { 262 abd_t cabd, dabd; 263 abd_get_from_buf_struct(&cabd, 264 cbuf, payload_size); 265 abd_get_from_buf_struct(&dabd, dbuf, 266 MIN(bufsz, drrw->drr_logical_size)); 267 if (zio_decompress_data(dtype, &cabd, &dabd, 268 payload_size, abd_get_size(&dabd), 269 NULL) != 0) { 270 warnx("decompression type %d failed " 271 "for ino %llu offset %llu", 272 dtype, 273 (u_longlong_t)drrw->drr_object, 274 (u_longlong_t)drrw->drr_offset); 275 exit(4); 276 } 277 payload_size = drrw->drr_logical_size; 278 abd_free(&dabd); 279 abd_free(&cabd); 280 free(cbuf); 281 } 282 283 /* Recompress the payload */ 284 if (ctype != ZIO_COMPRESS_OFF) { 285 abd_t dabd, abd; 286 abd_get_from_buf_struct(&dabd, 287 dbuf, drrw->drr_logical_size); 288 abd_t *pabd = 289 abd_get_from_buf_struct(&abd, buf, bufsz); 290 size_t csize = zio_compress_data(ctype, &dabd, 291 &pabd, drrw->drr_logical_size, 292 drrw->drr_logical_size, level); 293 size_t rounded = 294 P2ROUNDUP(csize, SPA_MINBLOCKSIZE); 295 if (rounded >= drrw->drr_logical_size) { 296 memcpy(buf, dbuf, payload_size); 297 drrw->drr_compressiontype = 0; 298 drrw->drr_compressed_size = 0; 299 } else { 300 abd_zero_off(pabd, csize, 301 rounded - csize); 302 drrw->drr_compressiontype = ctype; 303 drrw->drr_compressed_size = 304 payload_size = rounded; 305 } 306 abd_free(&abd); 307 abd_free(&dabd); 308 free(dbuf); 309 } else { 310 drrw->drr_compressiontype = 0; 311 drrw->drr_compressed_size = 0; 312 } 313 break; 314 } 315 316 case DRR_WRITE_EMBEDDED: 317 { 318 struct drr_write_embedded *drrwe = 319 &drr->drr_u.drr_write_embedded; 320 VERIFY3S(begin, ==, 1); 321 payload_size = 322 P2ROUNDUP((uint64_t)drrwe->drr_psize, 8); 323 (void) sfread(buf, payload_size, stdin); 324 break; 325 } 326 327 case DRR_FREEOBJECTS: 328 case DRR_FREE: 329 case DRR_OBJECT_RANGE: 330 VERIFY3S(begin, ==, 1); 331 break; 332 333 default: 334 (void) fprintf(stderr, "INVALID record type 0x%x\n", 335 drr->drr_type); 336 /* should never happen, so assert */ 337 assert(B_FALSE); 338 } 339 340 if (feof(stdout)) { 341 fprintf(stderr, "Error: unexpected end-of-file\n"); 342 exit(1); 343 } 344 if (ferror(stdout)) { 345 fprintf(stderr, "Error while reading file: %s\n", 346 strerror(errno)); 347 exit(1); 348 } 349 350 /* 351 * We need to recalculate the checksum, and it needs to be 352 * initially zero to do that. BEGIN records don't have 353 * a checksum. 354 */ 355 if (drr->drr_type != DRR_BEGIN) { 356 memset(&drr->drr_u.drr_checksum.drr_checksum, 0, 357 sizeof (drr->drr_u.drr_checksum.drr_checksum)); 358 } 359 if (dump_record(drr, buf, payload_size, 360 &stream_cksum, STDOUT_FILENO) != 0) 361 break; 362 if (drr->drr_type == DRR_END) { 363 /* 364 * Typically the END record is either the last 365 * thing in the stream, or it is followed 366 * by a BEGIN record (which also zeros the checksum). 367 * However, a stream package ends with two END 368 * records. The last END record's checksum starts 369 * from zero. 370 */ 371 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0); 372 } 373 } 374 free(buf); 375 fletcher_4_fini(); 376 zio_fini(); 377 zstd_fini(); 378 abd_fini(); 379 380 return (0); 381 } 382