1 // SPDX-License-Identifier: CDDL-1.0 2 /* 3 * CDDL HEADER START 4 * 5 * The contents of this file are subject to the terms of the 6 * Common Development and Distribution License (the "License"). 7 * You may not use this file except in compliance with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or https://opensource.org/licenses/CDDL-1.0. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 24 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 25 * Copyright (c) 2011, 2018 by Delphix. All rights reserved. 26 * Copyright (c) 2014, Joyent, Inc. All rights reserved. 27 * Copyright 2014 HybridCluster. All rights reserved. 28 * Copyright 2016 RackTop Systems. 29 * Copyright (c) 2016 Actifio, Inc. All rights reserved. 30 * Copyright (c) 2019, 2024, Klara, Inc. 31 * Copyright (c) 2019, Allan Jude 32 */ 33 34 #include <sys/dmu.h> 35 #include <sys/dmu_impl.h> 36 #include <sys/dmu_tx.h> 37 #include <sys/dbuf.h> 38 #include <sys/dnode.h> 39 #include <sys/zfs_context.h> 40 #include <sys/dmu_objset.h> 41 #include <sys/dmu_traverse.h> 42 #include <sys/dsl_dataset.h> 43 #include <sys/dsl_dir.h> 44 #include <sys/dsl_prop.h> 45 #include <sys/dsl_pool.h> 46 #include <sys/dsl_synctask.h> 47 #include <sys/spa_impl.h> 48 #include <sys/zfs_ioctl.h> 49 #include <sys/zap.h> 50 #include <sys/zio_checksum.h> 51 #include <sys/zfs_znode.h> 52 #include <zfs_fletcher.h> 53 #include <sys/avl.h> 54 #include <sys/ddt.h> 55 #include <sys/zfs_onexit.h> 56 #include <sys/dmu_send.h> 57 #include <sys/dmu_recv.h> 58 #include <sys/dsl_destroy.h> 59 #include <sys/blkptr.h> 60 #include <sys/dsl_bookmark.h> 61 #include <sys/zfeature.h> 62 #include <sys/bqueue.h> 63 #include <sys/zvol.h> 64 #include <sys/policy.h> 65 #include <sys/objlist.h> 66 #ifdef _KERNEL 67 #include <sys/zfs_vfsops.h> 68 #endif 69 70 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */ 71 static int zfs_send_corrupt_data = B_FALSE; 72 /* 73 * This tunable controls the amount of data (measured in bytes) that will be 74 * prefetched by zfs send. If the main thread is blocking on reads that haven't 75 * completed, this variable might need to be increased. If instead the main 76 * thread is issuing new reads because the prefetches have fallen out of the 77 * cache, this may need to be decreased. 78 */ 79 static uint_t zfs_send_queue_length = SPA_MAXBLOCKSIZE; 80 /* 81 * This tunable controls the length of the queues that zfs send worker threads 82 * use to communicate. If the send_main_thread is blocking on these queues, 83 * this variable may need to be increased. If there is a significant slowdown 84 * at the start of a send as these threads consume all the available IO 85 * resources, this variable may need to be decreased. 86 */ 87 static uint_t zfs_send_no_prefetch_queue_length = 1024 * 1024; 88 /* 89 * These tunables control the fill fraction of the queues by zfs send. The fill 90 * fraction controls the frequency with which threads have to be cv_signaled. 91 * If a lot of cpu time is being spent on cv_signal, then these should be tuned 92 * down. If the queues empty before the signalled thread can catch up, then 93 * these should be tuned up. 94 */ 95 static uint_t zfs_send_queue_ff = 20; 96 static uint_t zfs_send_no_prefetch_queue_ff = 20; 97 98 /* 99 * Use this to override the recordsize calculation for fast zfs send estimates. 100 */ 101 static uint_t zfs_override_estimate_recordsize = 0; 102 103 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */ 104 static const boolean_t zfs_send_set_freerecords_bit = B_TRUE; 105 106 /* Set this tunable to FALSE is disable sending unmodified spill blocks. */ 107 static int zfs_send_unmodified_spill_blocks = B_TRUE; 108 109 static inline boolean_t 110 overflow_multiply(uint64_t a, uint64_t b, uint64_t *c) 111 { 112 uint64_t temp = a * b; 113 if (b != 0 && temp / b != a) 114 return (B_FALSE); 115 *c = temp; 116 return (B_TRUE); 117 } 118 119 struct send_thread_arg { 120 bqueue_t q; 121 objset_t *os; /* Objset to traverse */ 122 uint64_t fromtxg; /* Traverse from this txg */ 123 int flags; /* flags to pass to traverse_dataset */ 124 int error_code; 125 boolean_t cancel; 126 zbookmark_phys_t resume; 127 uint64_t *num_blocks_visited; 128 }; 129 130 struct redact_list_thread_arg { 131 boolean_t cancel; 132 bqueue_t q; 133 zbookmark_phys_t resume; 134 redaction_list_t *rl; 135 boolean_t mark_redact; 136 int error_code; 137 uint64_t *num_blocks_visited; 138 }; 139 140 struct send_merge_thread_arg { 141 bqueue_t q; 142 objset_t *os; 143 struct redact_list_thread_arg *from_arg; 144 struct send_thread_arg *to_arg; 145 struct redact_list_thread_arg *redact_arg; 146 int error; 147 boolean_t cancel; 148 }; 149 150 struct send_range { 151 boolean_t eos_marker; /* Marks the end of the stream */ 152 uint64_t object; 153 uint64_t start_blkid; 154 uint64_t end_blkid; 155 bqueue_node_t ln; 156 enum type {DATA, HOLE, OBJECT, OBJECT_RANGE, REDACT, 157 PREVIOUSLY_REDACTED} type; 158 union { 159 struct srd { 160 dmu_object_type_t obj_type; 161 uint32_t datablksz; // logical size 162 uint32_t datasz; // payload size 163 blkptr_t bp; 164 arc_buf_t *abuf; 165 abd_t *abd; 166 kmutex_t lock; 167 kcondvar_t cv; 168 boolean_t io_outstanding; 169 boolean_t io_compressed; 170 int io_err; 171 } data; 172 struct srh { 173 uint32_t datablksz; 174 } hole; 175 struct sro { 176 /* 177 * This is a pointer because embedding it in the 178 * struct causes these structures to be massively larger 179 * for all range types; this makes the code much less 180 * memory efficient. 181 */ 182 dnode_phys_t *dnp; 183 blkptr_t bp; 184 /* Piggyback unmodified spill block */ 185 struct send_range *spill_range; 186 } object; 187 struct srr { 188 uint32_t datablksz; 189 } redact; 190 struct sror { 191 blkptr_t bp; 192 } object_range; 193 } sru; 194 }; 195 196 /* 197 * The list of data whose inclusion in a send stream can be pending from 198 * one call to backup_cb to another. Multiple calls to dump_free(), 199 * dump_freeobjects(), and dump_redact() can be aggregated into a single 200 * DRR_FREE, DRR_FREEOBJECTS, or DRR_REDACT replay record. 201 */ 202 typedef enum { 203 PENDING_NONE, 204 PENDING_FREE, 205 PENDING_FREEOBJECTS, 206 PENDING_REDACT 207 } dmu_pendop_t; 208 209 typedef struct dmu_send_cookie { 210 dmu_replay_record_t *dsc_drr; 211 dmu_send_outparams_t *dsc_dso; 212 offset_t *dsc_off; 213 objset_t *dsc_os; 214 zio_cksum_t dsc_zc; 215 uint64_t dsc_toguid; 216 uint64_t dsc_fromtxg; 217 int dsc_err; 218 dmu_pendop_t dsc_pending_op; 219 uint64_t dsc_featureflags; 220 uint64_t dsc_last_data_object; 221 uint64_t dsc_last_data_offset; 222 uint64_t dsc_resume_object; 223 uint64_t dsc_resume_offset; 224 boolean_t dsc_sent_begin; 225 boolean_t dsc_sent_end; 226 } dmu_send_cookie_t; 227 228 static int do_dump(dmu_send_cookie_t *dscp, struct send_range *range); 229 230 static void 231 range_free(struct send_range *range) 232 { 233 if (range->type == OBJECT) { 234 size_t size = sizeof (dnode_phys_t) * 235 (range->sru.object.dnp->dn_extra_slots + 1); 236 kmem_free(range->sru.object.dnp, size); 237 if (range->sru.object.spill_range) 238 range_free(range->sru.object.spill_range); 239 } else if (range->type == DATA) { 240 mutex_enter(&range->sru.data.lock); 241 while (range->sru.data.io_outstanding) 242 cv_wait(&range->sru.data.cv, &range->sru.data.lock); 243 if (range->sru.data.abd != NULL) 244 abd_free(range->sru.data.abd); 245 if (range->sru.data.abuf != NULL) { 246 arc_buf_destroy(range->sru.data.abuf, 247 &range->sru.data.abuf); 248 } 249 mutex_exit(&range->sru.data.lock); 250 251 cv_destroy(&range->sru.data.cv); 252 mutex_destroy(&range->sru.data.lock); 253 } 254 kmem_free(range, sizeof (*range)); 255 } 256 257 /* 258 * For all record types except BEGIN, fill in the checksum (overlaid in 259 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything 260 * up to the start of the checksum itself. 261 */ 262 static int 263 dump_record(dmu_send_cookie_t *dscp, void *payload, int payload_len) 264 { 265 dmu_send_outparams_t *dso = dscp->dsc_dso; 266 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 267 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 268 (void) fletcher_4_incremental_native(dscp->dsc_drr, 269 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 270 &dscp->dsc_zc); 271 if (dscp->dsc_drr->drr_type == DRR_BEGIN) { 272 dscp->dsc_sent_begin = B_TRUE; 273 } else { 274 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dscp->dsc_drr->drr_u. 275 drr_checksum.drr_checksum)); 276 dscp->dsc_drr->drr_u.drr_checksum.drr_checksum = dscp->dsc_zc; 277 } 278 if (dscp->dsc_drr->drr_type == DRR_END) { 279 dscp->dsc_sent_end = B_TRUE; 280 } 281 (void) fletcher_4_incremental_native(&dscp->dsc_drr-> 282 drr_u.drr_checksum.drr_checksum, 283 sizeof (zio_cksum_t), &dscp->dsc_zc); 284 *dscp->dsc_off += sizeof (dmu_replay_record_t); 285 dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, dscp->dsc_drr, 286 sizeof (dmu_replay_record_t), dso->dso_arg); 287 if (dscp->dsc_err != 0) 288 return (SET_ERROR(EINTR)); 289 if (payload_len != 0) { 290 *dscp->dsc_off += payload_len; 291 /* 292 * payload is null when dso_dryrun == B_TRUE (i.e. when we're 293 * doing a send size calculation) 294 */ 295 if (payload != NULL) { 296 (void) fletcher_4_incremental_native( 297 payload, payload_len, &dscp->dsc_zc); 298 } 299 300 /* 301 * The code does not rely on this (len being a multiple of 8). 302 * We keep this assertion because of the corresponding assertion 303 * in receive_read(). Keeping this assertion ensures that we do 304 * not inadvertently break backwards compatibility (causing the 305 * assertion in receive_read() to trigger on old software). 306 * 307 * Raw sends cannot be received on old software, and so can 308 * bypass this assertion. 309 */ 310 311 ASSERT((payload_len % 8 == 0) || 312 (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)); 313 314 dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, payload, 315 payload_len, dso->dso_arg); 316 if (dscp->dsc_err != 0) 317 return (SET_ERROR(EINTR)); 318 } 319 return (0); 320 } 321 322 /* 323 * Fill in the drr_free struct, or perform aggregation if the previous record is 324 * also a free record, and the two are adjacent. 325 * 326 * Note that we send free records even for a full send, because we want to be 327 * able to receive a full send as a clone, which requires a list of all the free 328 * and freeobject records that were generated on the source. 329 */ 330 static int 331 dump_free(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset, 332 uint64_t length) 333 { 334 struct drr_free *drrf = &(dscp->dsc_drr->drr_u.drr_free); 335 336 /* 337 * When we receive a free record, dbuf_free_range() assumes 338 * that the receiving system doesn't have any dbufs in the range 339 * being freed. This is always true because there is a one-record 340 * constraint: we only send one WRITE record for any given 341 * object,offset. We know that the one-record constraint is 342 * true because we always send data in increasing order by 343 * object,offset. 344 * 345 * If the increasing-order constraint ever changes, we should find 346 * another way to assert that the one-record constraint is still 347 * satisfied. 348 */ 349 ASSERT(object > dscp->dsc_last_data_object || 350 (object == dscp->dsc_last_data_object && 351 offset > dscp->dsc_last_data_offset)); 352 353 /* 354 * If there is a pending op, but it's not PENDING_FREE, push it out, 355 * since free block aggregation can only be done for blocks of the 356 * same type (i.e., DRR_FREE records can only be aggregated with 357 * other DRR_FREE records. DRR_FREEOBJECTS records can only be 358 * aggregated with other DRR_FREEOBJECTS records). 359 */ 360 if (dscp->dsc_pending_op != PENDING_NONE && 361 dscp->dsc_pending_op != PENDING_FREE) { 362 if (dump_record(dscp, NULL, 0) != 0) 363 return (SET_ERROR(EINTR)); 364 dscp->dsc_pending_op = PENDING_NONE; 365 } 366 367 if (dscp->dsc_pending_op == PENDING_FREE) { 368 /* 369 * Check to see whether this free block can be aggregated 370 * with pending one. 371 */ 372 if (drrf->drr_object == object && drrf->drr_offset + 373 drrf->drr_length == offset) { 374 if (offset + length < offset || length == UINT64_MAX) 375 drrf->drr_length = UINT64_MAX; 376 else 377 drrf->drr_length += length; 378 return (0); 379 } else { 380 /* not a continuation. Push out pending record */ 381 if (dump_record(dscp, NULL, 0) != 0) 382 return (SET_ERROR(EINTR)); 383 dscp->dsc_pending_op = PENDING_NONE; 384 } 385 } 386 /* create a FREE record and make it pending */ 387 memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t)); 388 dscp->dsc_drr->drr_type = DRR_FREE; 389 drrf->drr_object = object; 390 drrf->drr_offset = offset; 391 if (offset + length < offset) 392 drrf->drr_length = DMU_OBJECT_END; 393 else 394 drrf->drr_length = length; 395 drrf->drr_toguid = dscp->dsc_toguid; 396 if (length == DMU_OBJECT_END) { 397 if (dump_record(dscp, NULL, 0) != 0) 398 return (SET_ERROR(EINTR)); 399 } else { 400 dscp->dsc_pending_op = PENDING_FREE; 401 } 402 403 return (0); 404 } 405 406 /* 407 * Fill in the drr_redact struct, or perform aggregation if the previous record 408 * is also a redaction record, and the two are adjacent. 409 */ 410 static int 411 dump_redact(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset, 412 uint64_t length) 413 { 414 struct drr_redact *drrr = &dscp->dsc_drr->drr_u.drr_redact; 415 416 /* 417 * If there is a pending op, but it's not PENDING_REDACT, push it out, 418 * since free block aggregation can only be done for blocks of the 419 * same type (i.e., DRR_REDACT records can only be aggregated with 420 * other DRR_REDACT records). 421 */ 422 if (dscp->dsc_pending_op != PENDING_NONE && 423 dscp->dsc_pending_op != PENDING_REDACT) { 424 if (dump_record(dscp, NULL, 0) != 0) 425 return (SET_ERROR(EINTR)); 426 dscp->dsc_pending_op = PENDING_NONE; 427 } 428 429 if (dscp->dsc_pending_op == PENDING_REDACT) { 430 /* 431 * Check to see whether this redacted block can be aggregated 432 * with pending one. 433 */ 434 if (drrr->drr_object == object && drrr->drr_offset + 435 drrr->drr_length == offset) { 436 drrr->drr_length += length; 437 return (0); 438 } else { 439 /* not a continuation. Push out pending record */ 440 if (dump_record(dscp, NULL, 0) != 0) 441 return (SET_ERROR(EINTR)); 442 dscp->dsc_pending_op = PENDING_NONE; 443 } 444 } 445 /* create a REDACT record and make it pending */ 446 memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t)); 447 dscp->dsc_drr->drr_type = DRR_REDACT; 448 drrr->drr_object = object; 449 drrr->drr_offset = offset; 450 drrr->drr_length = length; 451 drrr->drr_toguid = dscp->dsc_toguid; 452 dscp->dsc_pending_op = PENDING_REDACT; 453 454 return (0); 455 } 456 457 static int 458 dmu_dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object, 459 uint64_t offset, int lsize, int psize, const blkptr_t *bp, 460 boolean_t io_compressed, void *data) 461 { 462 uint64_t payload_size; 463 boolean_t raw = (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW); 464 struct drr_write *drrw = &(dscp->dsc_drr->drr_u.drr_write); 465 466 /* 467 * We send data in increasing object, offset order. 468 * See comment in dump_free() for details. 469 */ 470 ASSERT(object > dscp->dsc_last_data_object || 471 (object == dscp->dsc_last_data_object && 472 offset > dscp->dsc_last_data_offset)); 473 dscp->dsc_last_data_object = object; 474 dscp->dsc_last_data_offset = offset + lsize - 1; 475 476 /* 477 * If there is any kind of pending aggregation (currently either 478 * a grouping of free objects or free blocks), push it out to 479 * the stream, since aggregation can't be done across operations 480 * of different types. 481 */ 482 if (dscp->dsc_pending_op != PENDING_NONE) { 483 if (dump_record(dscp, NULL, 0) != 0) 484 return (SET_ERROR(EINTR)); 485 dscp->dsc_pending_op = PENDING_NONE; 486 } 487 /* write a WRITE record */ 488 memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t)); 489 dscp->dsc_drr->drr_type = DRR_WRITE; 490 drrw->drr_object = object; 491 drrw->drr_type = type; 492 drrw->drr_offset = offset; 493 drrw->drr_toguid = dscp->dsc_toguid; 494 drrw->drr_logical_size = lsize; 495 496 /* only set the compression fields if the buf is compressed or raw */ 497 boolean_t compressed = 498 (bp != NULL ? BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF && 499 io_compressed : lsize != psize); 500 if (raw || compressed) { 501 ASSERT(bp != NULL); 502 ASSERT(raw || dscp->dsc_featureflags & 503 DMU_BACKUP_FEATURE_COMPRESSED); 504 ASSERT(!BP_IS_EMBEDDED(bp)); 505 ASSERT3S(psize, >, 0); 506 507 if (raw) { 508 ASSERT(BP_IS_PROTECTED(bp)); 509 510 /* 511 * This is a raw protected block so we need to pass 512 * along everything the receiving side will need to 513 * interpret this block, including the byteswap, salt, 514 * IV, and MAC. 515 */ 516 if (BP_SHOULD_BYTESWAP(bp)) 517 drrw->drr_flags |= DRR_RAW_BYTESWAP; 518 zio_crypt_decode_params_bp(bp, drrw->drr_salt, 519 drrw->drr_iv); 520 zio_crypt_decode_mac_bp(bp, drrw->drr_mac); 521 } else { 522 /* this is a compressed block */ 523 ASSERT(dscp->dsc_featureflags & 524 DMU_BACKUP_FEATURE_COMPRESSED); 525 ASSERT(!BP_SHOULD_BYTESWAP(bp)); 526 ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp))); 527 ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF); 528 ASSERT3S(lsize, >=, psize); 529 } 530 531 /* set fields common to compressed and raw sends */ 532 drrw->drr_compressiontype = BP_GET_COMPRESS(bp); 533 drrw->drr_compressed_size = psize; 534 payload_size = drrw->drr_compressed_size; 535 } else { 536 payload_size = drrw->drr_logical_size; 537 } 538 539 if (bp == NULL || BP_IS_EMBEDDED(bp) || (BP_IS_PROTECTED(bp) && !raw)) { 540 /* 541 * There's no pre-computed checksum for partial-block writes, 542 * embedded BP's, or encrypted BP's that are being sent as 543 * plaintext, so (like fletcher4-checksummed blocks) userland 544 * will have to compute a dedup-capable checksum itself. 545 */ 546 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF; 547 } else { 548 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp); 549 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags & 550 ZCHECKSUM_FLAG_DEDUP) 551 drrw->drr_flags |= DRR_CHECKSUM_DEDUP; 552 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp)); 553 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp)); 554 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp)); 555 DDK_SET_CRYPT(&drrw->drr_key, BP_IS_PROTECTED(bp)); 556 drrw->drr_key.ddk_cksum = bp->blk_cksum; 557 } 558 559 if (dump_record(dscp, data, payload_size) != 0) 560 return (SET_ERROR(EINTR)); 561 return (0); 562 } 563 564 static int 565 dump_write_embedded(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset, 566 int blksz, const blkptr_t *bp) 567 { 568 char buf[BPE_PAYLOAD_SIZE]; 569 struct drr_write_embedded *drrw = 570 &(dscp->dsc_drr->drr_u.drr_write_embedded); 571 572 if (dscp->dsc_pending_op != PENDING_NONE) { 573 if (dump_record(dscp, NULL, 0) != 0) 574 return (SET_ERROR(EINTR)); 575 dscp->dsc_pending_op = PENDING_NONE; 576 } 577 578 ASSERT(BP_IS_EMBEDDED(bp)); 579 580 memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t)); 581 dscp->dsc_drr->drr_type = DRR_WRITE_EMBEDDED; 582 drrw->drr_object = object; 583 drrw->drr_offset = offset; 584 drrw->drr_length = blksz; 585 drrw->drr_toguid = dscp->dsc_toguid; 586 drrw->drr_compression = BP_GET_COMPRESS(bp); 587 drrw->drr_etype = BPE_GET_ETYPE(bp); 588 drrw->drr_lsize = BPE_GET_LSIZE(bp); 589 drrw->drr_psize = BPE_GET_PSIZE(bp); 590 591 decode_embedded_bp_compressed(bp, buf); 592 593 uint32_t psize = drrw->drr_psize; 594 uint32_t rsize = P2ROUNDUP(psize, 8); 595 596 if (psize != rsize) 597 memset(buf + psize, 0, rsize - psize); 598 599 if (dump_record(dscp, buf, rsize) != 0) 600 return (SET_ERROR(EINTR)); 601 return (0); 602 } 603 604 static int 605 dump_spill(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object, 606 void *data) 607 { 608 struct drr_spill *drrs = &(dscp->dsc_drr->drr_u.drr_spill); 609 uint64_t blksz = BP_GET_LSIZE(bp); 610 uint64_t payload_size = blksz; 611 612 if (dscp->dsc_pending_op != PENDING_NONE) { 613 if (dump_record(dscp, NULL, 0) != 0) 614 return (SET_ERROR(EINTR)); 615 dscp->dsc_pending_op = PENDING_NONE; 616 } 617 618 /* write a SPILL record */ 619 memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t)); 620 dscp->dsc_drr->drr_type = DRR_SPILL; 621 drrs->drr_object = object; 622 drrs->drr_length = blksz; 623 drrs->drr_toguid = dscp->dsc_toguid; 624 625 /* See comment in piggyback_unmodified_spill() for full details */ 626 if (zfs_send_unmodified_spill_blocks && 627 (BP_GET_LOGICAL_BIRTH(bp) <= dscp->dsc_fromtxg)) { 628 drrs->drr_flags |= DRR_SPILL_UNMODIFIED; 629 } 630 631 /* handle raw send fields */ 632 if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) { 633 ASSERT(BP_IS_PROTECTED(bp)); 634 635 if (BP_SHOULD_BYTESWAP(bp)) 636 drrs->drr_flags |= DRR_RAW_BYTESWAP; 637 drrs->drr_compressiontype = BP_GET_COMPRESS(bp); 638 drrs->drr_compressed_size = BP_GET_PSIZE(bp); 639 zio_crypt_decode_params_bp(bp, drrs->drr_salt, drrs->drr_iv); 640 zio_crypt_decode_mac_bp(bp, drrs->drr_mac); 641 payload_size = drrs->drr_compressed_size; 642 } 643 644 if (dump_record(dscp, data, payload_size) != 0) 645 return (SET_ERROR(EINTR)); 646 return (0); 647 } 648 649 static int 650 dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs) 651 { 652 struct drr_freeobjects *drrfo = &(dscp->dsc_drr->drr_u.drr_freeobjects); 653 uint64_t maxobj = DNODES_PER_BLOCK * 654 (DMU_META_DNODE(dscp->dsc_os)->dn_maxblkid + 1); 655 656 /* 657 * ZoL < 0.7 does not handle large FREEOBJECTS records correctly, 658 * leading to zfs recv never completing. to avoid this issue, don't 659 * send FREEOBJECTS records for object IDs which cannot exist on the 660 * receiving side. 661 */ 662 if (maxobj > 0) { 663 if (maxobj <= firstobj) 664 return (0); 665 666 if (maxobj < firstobj + numobjs) 667 numobjs = maxobj - firstobj; 668 } 669 670 /* 671 * If there is a pending op, but it's not PENDING_FREEOBJECTS, 672 * push it out, since free block aggregation can only be done for 673 * blocks of the same type (i.e., DRR_FREE records can only be 674 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records 675 * can only be aggregated with other DRR_FREEOBJECTS records). 676 */ 677 if (dscp->dsc_pending_op != PENDING_NONE && 678 dscp->dsc_pending_op != PENDING_FREEOBJECTS) { 679 if (dump_record(dscp, NULL, 0) != 0) 680 return (SET_ERROR(EINTR)); 681 dscp->dsc_pending_op = PENDING_NONE; 682 } 683 684 if (dscp->dsc_pending_op == PENDING_FREEOBJECTS) { 685 /* 686 * See whether this free object array can be aggregated 687 * with pending one 688 */ 689 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) { 690 drrfo->drr_numobjs += numobjs; 691 return (0); 692 } else { 693 /* can't be aggregated. Push out pending record */ 694 if (dump_record(dscp, NULL, 0) != 0) 695 return (SET_ERROR(EINTR)); 696 dscp->dsc_pending_op = PENDING_NONE; 697 } 698 } 699 700 /* write a FREEOBJECTS record */ 701 memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t)); 702 dscp->dsc_drr->drr_type = DRR_FREEOBJECTS; 703 drrfo->drr_firstobj = firstobj; 704 drrfo->drr_numobjs = numobjs; 705 drrfo->drr_toguid = dscp->dsc_toguid; 706 707 dscp->dsc_pending_op = PENDING_FREEOBJECTS; 708 709 return (0); 710 } 711 712 static int 713 dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object, 714 dnode_phys_t *dnp) 715 { 716 struct drr_object *drro = &(dscp->dsc_drr->drr_u.drr_object); 717 int bonuslen; 718 719 if (object < dscp->dsc_resume_object) { 720 /* 721 * Note: when resuming, we will visit all the dnodes in 722 * the block of dnodes that we are resuming from. In 723 * this case it's unnecessary to send the dnodes prior to 724 * the one we are resuming from. We should be at most one 725 * block's worth of dnodes behind the resume point. 726 */ 727 ASSERT3U(dscp->dsc_resume_object - object, <, 728 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)); 729 return (0); 730 } 731 732 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE) 733 return (dump_freeobjects(dscp, object, 1)); 734 735 if (dscp->dsc_pending_op != PENDING_NONE) { 736 if (dump_record(dscp, NULL, 0) != 0) 737 return (SET_ERROR(EINTR)); 738 dscp->dsc_pending_op = PENDING_NONE; 739 } 740 741 /* write an OBJECT record */ 742 memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t)); 743 dscp->dsc_drr->drr_type = DRR_OBJECT; 744 drro->drr_object = object; 745 drro->drr_type = dnp->dn_type; 746 drro->drr_bonustype = dnp->dn_bonustype; 747 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 748 drro->drr_bonuslen = dnp->dn_bonuslen; 749 drro->drr_dn_slots = dnp->dn_extra_slots + 1; 750 drro->drr_checksumtype = dnp->dn_checksum; 751 drro->drr_compress = dnp->dn_compress; 752 drro->drr_toguid = dscp->dsc_toguid; 753 754 if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 755 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE) 756 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE; 757 758 bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8); 759 760 if ((dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) { 761 ASSERT(BP_IS_ENCRYPTED(bp)); 762 763 if (BP_SHOULD_BYTESWAP(bp)) 764 drro->drr_flags |= DRR_RAW_BYTESWAP; 765 766 /* needed for reconstructing dnp on recv side */ 767 drro->drr_maxblkid = dnp->dn_maxblkid; 768 drro->drr_indblkshift = dnp->dn_indblkshift; 769 drro->drr_nlevels = dnp->dn_nlevels; 770 drro->drr_nblkptr = dnp->dn_nblkptr; 771 772 /* 773 * Since we encrypt the entire bonus area, the (raw) part 774 * beyond the bonuslen is actually nonzero, so we need 775 * to send it. 776 */ 777 if (bonuslen != 0) { 778 if (drro->drr_bonuslen > DN_MAX_BONUS_LEN(dnp)) 779 return (SET_ERROR(EINVAL)); 780 drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp); 781 bonuslen = drro->drr_raw_bonuslen; 782 } 783 } 784 785 /* 786 * DRR_OBJECT_SPILL is set for every dnode which references a 787 * spill block. This allows the receiving pool to definitively 788 * determine when a spill block should be kept or freed. 789 */ 790 if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) 791 drro->drr_flags |= DRR_OBJECT_SPILL; 792 793 if (dump_record(dscp, DN_BONUS(dnp), bonuslen) != 0) 794 return (SET_ERROR(EINTR)); 795 796 /* Free anything past the end of the file. */ 797 if (dump_free(dscp, object, (dnp->dn_maxblkid + 1) * 798 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0) 799 return (SET_ERROR(EINTR)); 800 801 if (dscp->dsc_err != 0) 802 return (SET_ERROR(EINTR)); 803 804 return (0); 805 } 806 807 static int 808 dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp, 809 uint64_t firstobj, uint64_t numslots) 810 { 811 struct drr_object_range *drror = 812 &(dscp->dsc_drr->drr_u.drr_object_range); 813 814 /* we only use this record type for raw sends */ 815 ASSERT(BP_IS_PROTECTED(bp)); 816 ASSERT(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW); 817 ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF); 818 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE); 819 ASSERT0(BP_GET_LEVEL(bp)); 820 821 if (dscp->dsc_pending_op != PENDING_NONE) { 822 if (dump_record(dscp, NULL, 0) != 0) 823 return (SET_ERROR(EINTR)); 824 dscp->dsc_pending_op = PENDING_NONE; 825 } 826 827 memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t)); 828 dscp->dsc_drr->drr_type = DRR_OBJECT_RANGE; 829 drror->drr_firstobj = firstobj; 830 drror->drr_numslots = numslots; 831 drror->drr_toguid = dscp->dsc_toguid; 832 if (BP_SHOULD_BYTESWAP(bp)) 833 drror->drr_flags |= DRR_RAW_BYTESWAP; 834 zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv); 835 zio_crypt_decode_mac_bp(bp, drror->drr_mac); 836 837 if (dump_record(dscp, NULL, 0) != 0) 838 return (SET_ERROR(EINTR)); 839 return (0); 840 } 841 842 static boolean_t 843 send_do_embed(const blkptr_t *bp, uint64_t featureflags) 844 { 845 if (!BP_IS_EMBEDDED(bp)) 846 return (B_FALSE); 847 848 /* 849 * Compression function must be legacy, or explicitly enabled. 850 */ 851 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS && 852 !(featureflags & DMU_BACKUP_FEATURE_LZ4))) 853 return (B_FALSE); 854 855 /* 856 * If we have not set the ZSTD feature flag, we can't send ZSTD 857 * compressed embedded blocks, as the receiver may not support them. 858 */ 859 if ((BP_GET_COMPRESS(bp) == ZIO_COMPRESS_ZSTD && 860 !(featureflags & DMU_BACKUP_FEATURE_ZSTD))) 861 return (B_FALSE); 862 863 /* 864 * Embed type must be explicitly enabled. 865 */ 866 switch (BPE_GET_ETYPE(bp)) { 867 case BP_EMBEDDED_TYPE_DATA: 868 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) 869 return (B_TRUE); 870 break; 871 default: 872 return (B_FALSE); 873 } 874 return (B_FALSE); 875 } 876 877 /* 878 * This function actually handles figuring out what kind of record needs to be 879 * dumped, and calling the appropriate helper function. In most cases, 880 * the data has already been read by send_reader_thread(). 881 */ 882 static int 883 do_dump(dmu_send_cookie_t *dscp, struct send_range *range) 884 { 885 int err = 0; 886 switch (range->type) { 887 case OBJECT: 888 err = dump_dnode(dscp, &range->sru.object.bp, range->object, 889 range->sru.object.dnp); 890 /* Dump piggybacked unmodified spill block */ 891 if (!err && range->sru.object.spill_range) 892 err = do_dump(dscp, range->sru.object.spill_range); 893 return (err); 894 case OBJECT_RANGE: { 895 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid); 896 if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) { 897 return (0); 898 } 899 uint64_t epb = BP_GET_LSIZE(&range->sru.object_range.bp) >> 900 DNODE_SHIFT; 901 uint64_t firstobj = range->start_blkid * epb; 902 err = dump_object_range(dscp, &range->sru.object_range.bp, 903 firstobj, epb); 904 break; 905 } 906 case REDACT: { 907 struct srr *srrp = &range->sru.redact; 908 err = dump_redact(dscp, range->object, range->start_blkid * 909 srrp->datablksz, (range->end_blkid - range->start_blkid) * 910 srrp->datablksz); 911 return (err); 912 } 913 case DATA: { 914 struct srd *srdp = &range->sru.data; 915 blkptr_t *bp = &srdp->bp; 916 spa_t *spa = 917 dmu_objset_spa(dscp->dsc_os); 918 919 ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp)); 920 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid); 921 922 if (send_do_embed(bp, dscp->dsc_featureflags)) { 923 err = dump_write_embedded(dscp, range->object, 924 range->start_blkid * srdp->datablksz, 925 srdp->datablksz, bp); 926 return (err); 927 } 928 ASSERT(range->object > dscp->dsc_resume_object || 929 (range->object == dscp->dsc_resume_object && 930 (range->start_blkid == DMU_SPILL_BLKID || 931 range->start_blkid * srdp->datablksz >= 932 dscp->dsc_resume_offset))); 933 /* it's a level-0 block of a regular object */ 934 935 mutex_enter(&srdp->lock); 936 while (srdp->io_outstanding) 937 cv_wait(&srdp->cv, &srdp->lock); 938 err = srdp->io_err; 939 mutex_exit(&srdp->lock); 940 941 if (err != 0) { 942 if (zfs_send_corrupt_data && 943 !dscp->dsc_dso->dso_dryrun) { 944 /* 945 * Send a block filled with 0x"zfs badd bloc" 946 */ 947 srdp->abuf = arc_alloc_buf(spa, &srdp->abuf, 948 ARC_BUFC_DATA, srdp->datablksz); 949 uint64_t *ptr; 950 for (ptr = srdp->abuf->b_data; 951 (char *)ptr < (char *)srdp->abuf->b_data + 952 srdp->datablksz; ptr++) 953 *ptr = 0x2f5baddb10cULL; 954 } else { 955 return (SET_ERROR(EIO)); 956 } 957 } 958 959 ASSERT(dscp->dsc_dso->dso_dryrun || 960 srdp->abuf != NULL || srdp->abd != NULL); 961 962 char *data = NULL; 963 if (srdp->abd != NULL) { 964 data = abd_to_buf(srdp->abd); 965 ASSERT0P(srdp->abuf); 966 } else if (srdp->abuf != NULL) { 967 data = srdp->abuf->b_data; 968 } 969 970 if (BP_GET_TYPE(bp) == DMU_OT_SA) { 971 ASSERT3U(range->start_blkid, ==, DMU_SPILL_BLKID); 972 err = dump_spill(dscp, bp, range->object, data); 973 return (err); 974 } 975 976 uint64_t offset = range->start_blkid * srdp->datablksz; 977 978 /* 979 * If we have large blocks stored on disk but the send flags 980 * don't allow us to send large blocks, we split the data from 981 * the arc buf into chunks. 982 */ 983 if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE && 984 !(dscp->dsc_featureflags & 985 DMU_BACKUP_FEATURE_LARGE_BLOCKS)) { 986 while (srdp->datablksz > 0 && err == 0) { 987 int n = MIN(srdp->datablksz, 988 SPA_OLD_MAXBLOCKSIZE); 989 err = dmu_dump_write(dscp, srdp->obj_type, 990 range->object, offset, n, n, NULL, B_FALSE, 991 data); 992 offset += n; 993 /* 994 * When doing dry run, data==NULL is used as a 995 * sentinel value by 996 * dmu_dump_write()->dump_record(). 997 */ 998 if (data != NULL) 999 data += n; 1000 srdp->datablksz -= n; 1001 } 1002 } else { 1003 err = dmu_dump_write(dscp, srdp->obj_type, 1004 range->object, offset, 1005 srdp->datablksz, srdp->datasz, bp, 1006 srdp->io_compressed, data); 1007 } 1008 return (err); 1009 } 1010 case HOLE: { 1011 struct srh *srhp = &range->sru.hole; 1012 if (range->object == DMU_META_DNODE_OBJECT) { 1013 uint32_t span = srhp->datablksz >> DNODE_SHIFT; 1014 uint64_t first_obj = range->start_blkid * span; 1015 uint64_t numobj = range->end_blkid * span - first_obj; 1016 return (dump_freeobjects(dscp, first_obj, numobj)); 1017 } 1018 uint64_t offset = 0; 1019 1020 /* 1021 * If this multiply overflows, we don't need to send this block. 1022 * Even if it has a birth time, it can never not be a hole, so 1023 * we don't need to send records for it. 1024 */ 1025 if (!overflow_multiply(range->start_blkid, srhp->datablksz, 1026 &offset)) { 1027 return (0); 1028 } 1029 uint64_t len = 0; 1030 1031 if (!overflow_multiply(range->end_blkid, srhp->datablksz, &len)) 1032 len = UINT64_MAX; 1033 len = len - offset; 1034 return (dump_free(dscp, range->object, offset, len)); 1035 } 1036 default: 1037 panic("Invalid range type in do_dump: %d", range->type); 1038 } 1039 return (err); 1040 } 1041 1042 static struct send_range * 1043 range_alloc(enum type type, uint64_t object, uint64_t start_blkid, 1044 uint64_t end_blkid, boolean_t eos) 1045 { 1046 struct send_range *range = kmem_alloc(sizeof (*range), KM_SLEEP); 1047 range->type = type; 1048 range->object = object; 1049 range->start_blkid = start_blkid; 1050 range->end_blkid = end_blkid; 1051 range->eos_marker = eos; 1052 if (type == DATA) { 1053 range->sru.data.abd = NULL; 1054 range->sru.data.abuf = NULL; 1055 mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL); 1056 cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL); 1057 range->sru.data.io_outstanding = 0; 1058 range->sru.data.io_err = 0; 1059 range->sru.data.io_compressed = B_FALSE; 1060 } else if (type == OBJECT) { 1061 range->sru.object.spill_range = NULL; 1062 } 1063 return (range); 1064 } 1065 1066 /* 1067 * This is the callback function to traverse_dataset that acts as a worker 1068 * thread for dmu_send_impl. 1069 */ 1070 static int 1071 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 1072 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg) 1073 { 1074 (void) zilog; 1075 struct send_thread_arg *sta = arg; 1076 struct send_range *record; 1077 1078 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 1079 zb->zb_object >= sta->resume.zb_object); 1080 1081 /* 1082 * All bps of an encrypted os should have the encryption bit set. 1083 * If this is not true it indicates tampering and we report an error. 1084 */ 1085 if (sta->os->os_encrypted && 1086 !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) { 1087 spa_log_error(spa, zb, BP_GET_PHYSICAL_BIRTH(bp)); 1088 return (SET_ERROR(EIO)); 1089 } 1090 1091 if (sta->cancel) 1092 return (SET_ERROR(EINTR)); 1093 if (zb->zb_object != DMU_META_DNODE_OBJECT && 1094 DMU_OBJECT_IS_SPECIAL(zb->zb_object)) 1095 return (0); 1096 atomic_inc_64(sta->num_blocks_visited); 1097 1098 if (zb->zb_level == ZB_DNODE_LEVEL) { 1099 if (zb->zb_object == DMU_META_DNODE_OBJECT) 1100 return (0); 1101 record = range_alloc(OBJECT, zb->zb_object, 0, 0, B_FALSE); 1102 record->sru.object.bp = *bp; 1103 size_t size = sizeof (*dnp) * (dnp->dn_extra_slots + 1); 1104 record->sru.object.dnp = kmem_alloc(size, KM_SLEEP); 1105 memcpy(record->sru.object.dnp, dnp, size); 1106 bqueue_enqueue(&sta->q, record, sizeof (*record)); 1107 return (0); 1108 } 1109 if (zb->zb_level == 0 && zb->zb_object == DMU_META_DNODE_OBJECT && 1110 !BP_IS_HOLE(bp)) { 1111 record = range_alloc(OBJECT_RANGE, 0, zb->zb_blkid, 1112 zb->zb_blkid + 1, B_FALSE); 1113 record->sru.object_range.bp = *bp; 1114 bqueue_enqueue(&sta->q, record, sizeof (*record)); 1115 return (0); 1116 } 1117 if (zb->zb_level < 0 || (zb->zb_level > 0 && !BP_IS_HOLE(bp))) 1118 return (0); 1119 if (zb->zb_object == DMU_META_DNODE_OBJECT && !BP_IS_HOLE(bp)) 1120 return (0); 1121 1122 uint64_t span = bp_span_in_blocks(dnp->dn_indblkshift, zb->zb_level); 1123 uint64_t start; 1124 1125 /* 1126 * If this multiply overflows, we don't need to send this block. 1127 * Even if it has a birth time, it can never not be a hole, so 1128 * we don't need to send records for it. 1129 */ 1130 if (!overflow_multiply(span, zb->zb_blkid, &start) || (!(zb->zb_blkid == 1131 DMU_SPILL_BLKID || DMU_OT_IS_METADATA(dnp->dn_type)) && 1132 span * zb->zb_blkid > dnp->dn_maxblkid)) { 1133 ASSERT(BP_IS_HOLE(bp)); 1134 return (0); 1135 } 1136 1137 if (zb->zb_blkid == DMU_SPILL_BLKID) 1138 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA); 1139 1140 enum type record_type = DATA; 1141 if (BP_IS_HOLE(bp)) 1142 record_type = HOLE; 1143 else if (BP_IS_REDACTED(bp)) 1144 record_type = REDACT; 1145 else 1146 record_type = DATA; 1147 1148 record = range_alloc(record_type, zb->zb_object, start, 1149 (start + span < start ? 0 : start + span), B_FALSE); 1150 1151 uint64_t datablksz = (zb->zb_blkid == DMU_SPILL_BLKID ? 1152 BP_GET_LSIZE(bp) : dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT); 1153 1154 if (BP_IS_HOLE(bp)) { 1155 record->sru.hole.datablksz = datablksz; 1156 } else if (BP_IS_REDACTED(bp)) { 1157 record->sru.redact.datablksz = datablksz; 1158 } else { 1159 record->sru.data.datablksz = datablksz; 1160 record->sru.data.obj_type = dnp->dn_type; 1161 record->sru.data.bp = *bp; 1162 } 1163 1164 bqueue_enqueue(&sta->q, record, sizeof (*record)); 1165 return (0); 1166 } 1167 1168 struct redact_list_cb_arg { 1169 uint64_t *num_blocks_visited; 1170 bqueue_t *q; 1171 boolean_t *cancel; 1172 boolean_t mark_redact; 1173 }; 1174 1175 static int 1176 redact_list_cb(redact_block_phys_t *rb, void *arg) 1177 { 1178 struct redact_list_cb_arg *rlcap = arg; 1179 1180 atomic_inc_64(rlcap->num_blocks_visited); 1181 if (*rlcap->cancel) 1182 return (-1); 1183 1184 struct send_range *data = range_alloc(REDACT, rb->rbp_object, 1185 rb->rbp_blkid, rb->rbp_blkid + redact_block_get_count(rb), B_FALSE); 1186 ASSERT3U(data->end_blkid, >, rb->rbp_blkid); 1187 if (rlcap->mark_redact) { 1188 data->type = REDACT; 1189 data->sru.redact.datablksz = redact_block_get_size(rb); 1190 } else { 1191 data->type = PREVIOUSLY_REDACTED; 1192 } 1193 bqueue_enqueue(rlcap->q, data, sizeof (*data)); 1194 1195 return (0); 1196 } 1197 1198 /* 1199 * This function kicks off the traverse_dataset. It also handles setting the 1200 * error code of the thread in case something goes wrong, and pushes the End of 1201 * Stream record when the traverse_dataset call has finished. 1202 */ 1203 static __attribute__((noreturn)) void 1204 send_traverse_thread(void *arg) 1205 { 1206 struct send_thread_arg *st_arg = arg; 1207 int err = 0; 1208 struct send_range *data; 1209 fstrans_cookie_t cookie = spl_fstrans_mark(); 1210 1211 err = traverse_dataset_resume(st_arg->os->os_dsl_dataset, 1212 st_arg->fromtxg, &st_arg->resume, 1213 st_arg->flags | TRAVERSE_LOGICAL, send_cb, st_arg); 1214 1215 if (err != EINTR) 1216 st_arg->error_code = err; 1217 data = range_alloc(DATA, 0, 0, 0, B_TRUE); 1218 bqueue_enqueue_flush(&st_arg->q, data, sizeof (*data)); 1219 spl_fstrans_unmark(cookie); 1220 thread_exit(); 1221 } 1222 1223 /* 1224 * Utility function that causes End of Stream records to compare after of all 1225 * others, so that other threads' comparison logic can stay simple. 1226 */ 1227 static int __attribute__((unused)) 1228 send_range_after(const struct send_range *from, const struct send_range *to) 1229 { 1230 if (from->eos_marker == B_TRUE) 1231 return (1); 1232 if (to->eos_marker == B_TRUE) 1233 return (-1); 1234 1235 uint64_t from_obj = from->object; 1236 uint64_t from_end_obj = from->object + 1; 1237 uint64_t to_obj = to->object; 1238 uint64_t to_end_obj = to->object + 1; 1239 if (from_obj == 0) { 1240 ASSERT(from->type == HOLE || from->type == OBJECT_RANGE); 1241 from_obj = from->start_blkid << DNODES_PER_BLOCK_SHIFT; 1242 from_end_obj = from->end_blkid << DNODES_PER_BLOCK_SHIFT; 1243 } 1244 if (to_obj == 0) { 1245 ASSERT(to->type == HOLE || to->type == OBJECT_RANGE); 1246 to_obj = to->start_blkid << DNODES_PER_BLOCK_SHIFT; 1247 to_end_obj = to->end_blkid << DNODES_PER_BLOCK_SHIFT; 1248 } 1249 1250 if (from_end_obj <= to_obj) 1251 return (-1); 1252 if (from_obj >= to_end_obj) 1253 return (1); 1254 int64_t cmp = TREE_CMP(to->type == OBJECT_RANGE, from->type == 1255 OBJECT_RANGE); 1256 if (unlikely(cmp)) 1257 return (cmp); 1258 cmp = TREE_CMP(to->type == OBJECT, from->type == OBJECT); 1259 if (unlikely(cmp)) 1260 return (cmp); 1261 if (from->end_blkid <= to->start_blkid) 1262 return (-1); 1263 if (from->start_blkid >= to->end_blkid) 1264 return (1); 1265 return (0); 1266 } 1267 1268 /* 1269 * Pop the new data off the queue, check that the records we receive are in 1270 * the right order, but do not free the old data. This is used so that the 1271 * records can be sent on to the main thread without copying the data. 1272 */ 1273 static struct send_range * 1274 get_next_range_nofree(bqueue_t *bq, struct send_range *prev) 1275 { 1276 struct send_range *next = bqueue_dequeue(bq); 1277 ASSERT3S(send_range_after(prev, next), ==, -1); 1278 return (next); 1279 } 1280 1281 /* 1282 * Pop the new data off the queue, check that the records we receive are in 1283 * the right order, and free the old data. 1284 */ 1285 static struct send_range * 1286 get_next_range(bqueue_t *bq, struct send_range *prev) 1287 { 1288 struct send_range *next = get_next_range_nofree(bq, prev); 1289 range_free(prev); 1290 return (next); 1291 } 1292 1293 static __attribute__((noreturn)) void 1294 redact_list_thread(void *arg) 1295 { 1296 struct redact_list_thread_arg *rlt_arg = arg; 1297 struct send_range *record; 1298 fstrans_cookie_t cookie = spl_fstrans_mark(); 1299 if (rlt_arg->rl != NULL) { 1300 struct redact_list_cb_arg rlcba = {0}; 1301 rlcba.cancel = &rlt_arg->cancel; 1302 rlcba.q = &rlt_arg->q; 1303 rlcba.num_blocks_visited = rlt_arg->num_blocks_visited; 1304 rlcba.mark_redact = rlt_arg->mark_redact; 1305 int err = dsl_redaction_list_traverse(rlt_arg->rl, 1306 &rlt_arg->resume, redact_list_cb, &rlcba); 1307 if (err != EINTR) 1308 rlt_arg->error_code = err; 1309 } 1310 record = range_alloc(DATA, 0, 0, 0, B_TRUE); 1311 bqueue_enqueue_flush(&rlt_arg->q, record, sizeof (*record)); 1312 spl_fstrans_unmark(cookie); 1313 1314 thread_exit(); 1315 } 1316 1317 /* 1318 * Compare the start point of the two provided ranges. End of stream ranges 1319 * compare last, objects compare before any data or hole inside that object and 1320 * multi-object holes that start at the same object. 1321 */ 1322 static int 1323 send_range_start_compare(struct send_range *r1, struct send_range *r2) 1324 { 1325 uint64_t r1_objequiv = r1->object; 1326 uint64_t r1_l0equiv = r1->start_blkid; 1327 uint64_t r2_objequiv = r2->object; 1328 uint64_t r2_l0equiv = r2->start_blkid; 1329 int64_t cmp = TREE_CMP(r1->eos_marker, r2->eos_marker); 1330 if (unlikely(cmp)) 1331 return (cmp); 1332 if (r1->object == 0) { 1333 r1_objequiv = r1->start_blkid * DNODES_PER_BLOCK; 1334 r1_l0equiv = 0; 1335 } 1336 if (r2->object == 0) { 1337 r2_objequiv = r2->start_blkid * DNODES_PER_BLOCK; 1338 r2_l0equiv = 0; 1339 } 1340 1341 cmp = TREE_CMP(r1_objequiv, r2_objequiv); 1342 if (likely(cmp)) 1343 return (cmp); 1344 cmp = TREE_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE); 1345 if (unlikely(cmp)) 1346 return (cmp); 1347 cmp = TREE_CMP(r2->type == OBJECT, r1->type == OBJECT); 1348 if (unlikely(cmp)) 1349 return (cmp); 1350 1351 return (TREE_CMP(r1_l0equiv, r2_l0equiv)); 1352 } 1353 1354 enum q_idx { 1355 REDACT_IDX = 0, 1356 TO_IDX, 1357 FROM_IDX, 1358 NUM_THREADS 1359 }; 1360 1361 /* 1362 * This function returns the next range the send_merge_thread should operate on. 1363 * The inputs are two arrays; the first one stores the range at the front of the 1364 * queues stored in the second one. The ranges are sorted in descending 1365 * priority order; the metadata from earlier ranges overrules metadata from 1366 * later ranges. out_mask is used to return which threads the ranges came from; 1367 * bit i is set if ranges[i] started at the same place as the returned range. 1368 * 1369 * This code is not hardcoded to compare a specific number of threads; it could 1370 * be used with any number, just by changing the q_idx enum. 1371 * 1372 * The "next range" is the one with the earliest start; if two starts are equal, 1373 * the highest-priority range is the next to operate on. If a higher-priority 1374 * range starts in the middle of the first range, then the first range will be 1375 * truncated to end where the higher-priority range starts, and we will operate 1376 * on that one next time. In this way, we make sure that each block covered by 1377 * some range gets covered by a returned range, and each block covered is 1378 * returned using the metadata of the highest-priority range it appears in. 1379 * 1380 * For example, if the three ranges at the front of the queues were [2,4), 1381 * [3,5), and [1,3), then the ranges returned would be [1,2) with the metadata 1382 * from the third range, [2,4) with the metadata from the first range, and then 1383 * [4,5) with the metadata from the second. 1384 */ 1385 static struct send_range * 1386 find_next_range(struct send_range **ranges, bqueue_t **qs, uint64_t *out_mask) 1387 { 1388 int idx = 0; // index of the range with the earliest start 1389 int i; 1390 uint64_t bmask = 0; 1391 for (i = 1; i < NUM_THREADS; i++) { 1392 if (send_range_start_compare(ranges[i], ranges[idx]) < 0) 1393 idx = i; 1394 } 1395 if (ranges[idx]->eos_marker) { 1396 struct send_range *ret = range_alloc(DATA, 0, 0, 0, B_TRUE); 1397 *out_mask = 0; 1398 return (ret); 1399 } 1400 /* 1401 * Find all the ranges that start at that same point. 1402 */ 1403 for (i = 0; i < NUM_THREADS; i++) { 1404 if (send_range_start_compare(ranges[i], ranges[idx]) == 0) 1405 bmask |= 1 << i; 1406 } 1407 *out_mask = bmask; 1408 /* 1409 * OBJECT_RANGE records only come from the TO thread, and should always 1410 * be treated as overlapping with nothing and sent on immediately. They 1411 * are only used in raw sends, and are never redacted. 1412 */ 1413 if (ranges[idx]->type == OBJECT_RANGE) { 1414 ASSERT3U(idx, ==, TO_IDX); 1415 ASSERT3U(*out_mask, ==, 1 << TO_IDX); 1416 struct send_range *ret = ranges[idx]; 1417 ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]); 1418 return (ret); 1419 } 1420 /* 1421 * Find the first start or end point after the start of the first range. 1422 */ 1423 uint64_t first_change = ranges[idx]->end_blkid; 1424 for (i = 0; i < NUM_THREADS; i++) { 1425 if (i == idx || ranges[i]->eos_marker || 1426 ranges[i]->object > ranges[idx]->object || 1427 ranges[i]->object == DMU_META_DNODE_OBJECT) 1428 continue; 1429 ASSERT3U(ranges[i]->object, ==, ranges[idx]->object); 1430 if (first_change > ranges[i]->start_blkid && 1431 (bmask & (1 << i)) == 0) 1432 first_change = ranges[i]->start_blkid; 1433 else if (first_change > ranges[i]->end_blkid) 1434 first_change = ranges[i]->end_blkid; 1435 } 1436 /* 1437 * Update all ranges to no longer overlap with the range we're 1438 * returning. All such ranges must start at the same place as the range 1439 * being returned, and end at or after first_change. Thus we update 1440 * their start to first_change. If that makes them size 0, then free 1441 * them and pull a new range from that thread. 1442 */ 1443 for (i = 0; i < NUM_THREADS; i++) { 1444 if (i == idx || (bmask & (1 << i)) == 0) 1445 continue; 1446 ASSERT3U(first_change, >, ranges[i]->start_blkid); 1447 ranges[i]->start_blkid = first_change; 1448 ASSERT3U(ranges[i]->start_blkid, <=, ranges[i]->end_blkid); 1449 if (ranges[i]->start_blkid == ranges[i]->end_blkid) 1450 ranges[i] = get_next_range(qs[i], ranges[i]); 1451 } 1452 /* 1453 * Short-circuit the simple case; if the range doesn't overlap with 1454 * anything else, or it only overlaps with things that start at the same 1455 * place and are longer, send it on. 1456 */ 1457 if (first_change == ranges[idx]->end_blkid) { 1458 struct send_range *ret = ranges[idx]; 1459 ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]); 1460 return (ret); 1461 } 1462 1463 /* 1464 * Otherwise, return a truncated copy of ranges[idx] and move the start 1465 * of ranges[idx] back to first_change. 1466 */ 1467 struct send_range *ret = kmem_alloc(sizeof (*ret), KM_SLEEP); 1468 *ret = *ranges[idx]; 1469 ret->end_blkid = first_change; 1470 ranges[idx]->start_blkid = first_change; 1471 return (ret); 1472 } 1473 1474 #define FROM_AND_REDACT_BITS ((1 << REDACT_IDX) | (1 << FROM_IDX)) 1475 1476 /* 1477 * Merge the results from the from thread and the to thread, and then hand the 1478 * records off to send_prefetch_thread to prefetch them. If this is not a 1479 * send from a redaction bookmark, the from thread will push an end of stream 1480 * record and stop, and we'll just send everything that was changed in the 1481 * to_ds since the ancestor's creation txg. If it is, then since 1482 * traverse_dataset has a canonical order, we can compare each change as 1483 * they're pulled off the queues. That will give us a stream that is 1484 * appropriately sorted, and covers all records. In addition, we pull the 1485 * data from the redact_list_thread and use that to determine which blocks 1486 * should be redacted. 1487 */ 1488 static __attribute__((noreturn)) void 1489 send_merge_thread(void *arg) 1490 { 1491 struct send_merge_thread_arg *smt_arg = arg; 1492 struct send_range *front_ranges[NUM_THREADS]; 1493 bqueue_t *queues[NUM_THREADS]; 1494 int err = 0; 1495 fstrans_cookie_t cookie = spl_fstrans_mark(); 1496 1497 if (smt_arg->redact_arg == NULL) { 1498 front_ranges[REDACT_IDX] = 1499 kmem_zalloc(sizeof (struct send_range), KM_SLEEP); 1500 front_ranges[REDACT_IDX]->eos_marker = B_TRUE; 1501 front_ranges[REDACT_IDX]->type = REDACT; 1502 queues[REDACT_IDX] = NULL; 1503 } else { 1504 front_ranges[REDACT_IDX] = 1505 bqueue_dequeue(&smt_arg->redact_arg->q); 1506 queues[REDACT_IDX] = &smt_arg->redact_arg->q; 1507 } 1508 front_ranges[TO_IDX] = bqueue_dequeue(&smt_arg->to_arg->q); 1509 queues[TO_IDX] = &smt_arg->to_arg->q; 1510 front_ranges[FROM_IDX] = bqueue_dequeue(&smt_arg->from_arg->q); 1511 queues[FROM_IDX] = &smt_arg->from_arg->q; 1512 uint64_t mask = 0; 1513 struct send_range *range; 1514 for (range = find_next_range(front_ranges, queues, &mask); 1515 !range->eos_marker && err == 0 && !smt_arg->cancel; 1516 range = find_next_range(front_ranges, queues, &mask)) { 1517 /* 1518 * If the range in question was in both the from redact bookmark 1519 * and the bookmark we're using to redact, then don't send it. 1520 * It's already redacted on the receiving system, so a redaction 1521 * record would be redundant. 1522 */ 1523 if ((mask & FROM_AND_REDACT_BITS) == FROM_AND_REDACT_BITS) { 1524 ASSERT3U(range->type, ==, REDACT); 1525 range_free(range); 1526 continue; 1527 } 1528 bqueue_enqueue(&smt_arg->q, range, sizeof (*range)); 1529 1530 if (smt_arg->to_arg->error_code != 0) { 1531 err = smt_arg->to_arg->error_code; 1532 } else if (smt_arg->from_arg->error_code != 0) { 1533 err = smt_arg->from_arg->error_code; 1534 } else if (smt_arg->redact_arg != NULL && 1535 smt_arg->redact_arg->error_code != 0) { 1536 err = smt_arg->redact_arg->error_code; 1537 } 1538 } 1539 if (smt_arg->cancel && err == 0) 1540 err = SET_ERROR(EINTR); 1541 smt_arg->error = err; 1542 if (smt_arg->error != 0) { 1543 smt_arg->to_arg->cancel = B_TRUE; 1544 smt_arg->from_arg->cancel = B_TRUE; 1545 if (smt_arg->redact_arg != NULL) 1546 smt_arg->redact_arg->cancel = B_TRUE; 1547 } 1548 for (int i = 0; i < NUM_THREADS; i++) { 1549 while (!front_ranges[i]->eos_marker) { 1550 front_ranges[i] = get_next_range(queues[i], 1551 front_ranges[i]); 1552 } 1553 range_free(front_ranges[i]); 1554 } 1555 range->eos_marker = B_TRUE; 1556 bqueue_enqueue_flush(&smt_arg->q, range, 1); 1557 spl_fstrans_unmark(cookie); 1558 thread_exit(); 1559 } 1560 1561 struct send_reader_thread_arg { 1562 struct send_merge_thread_arg *smta; 1563 bqueue_t q; 1564 boolean_t cancel; 1565 boolean_t issue_reads; 1566 uint64_t featureflags; 1567 int error; 1568 }; 1569 1570 static void 1571 dmu_send_read_done(zio_t *zio) 1572 { 1573 struct send_range *range = zio->io_private; 1574 1575 mutex_enter(&range->sru.data.lock); 1576 if (zio->io_error != 0) { 1577 abd_free(range->sru.data.abd); 1578 range->sru.data.abd = NULL; 1579 range->sru.data.io_err = zio->io_error; 1580 } 1581 1582 ASSERT(range->sru.data.io_outstanding); 1583 range->sru.data.io_outstanding = B_FALSE; 1584 cv_broadcast(&range->sru.data.cv); 1585 mutex_exit(&range->sru.data.lock); 1586 } 1587 1588 static void 1589 issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range) 1590 { 1591 struct srd *srdp = &range->sru.data; 1592 blkptr_t *bp = &srdp->bp; 1593 objset_t *os = srta->smta->os; 1594 1595 ASSERT3U(range->type, ==, DATA); 1596 ASSERT3U(range->start_blkid + 1, ==, range->end_blkid); 1597 /* 1598 * If we have large blocks stored on disk but 1599 * the send flags don't allow us to send large 1600 * blocks, we split the data from the arc buf 1601 * into chunks. 1602 */ 1603 boolean_t split_large_blocks = 1604 srdp->datablksz > SPA_OLD_MAXBLOCKSIZE && 1605 !(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS); 1606 /* 1607 * We should only request compressed data from the ARC if all 1608 * the following are true: 1609 * - stream compression was requested 1610 * - we aren't splitting large blocks into smaller chunks 1611 * - the data won't need to be byteswapped before sending 1612 * - this isn't an embedded block 1613 * - this isn't metadata (if receiving on a different endian 1614 * system it can be byteswapped more easily) 1615 */ 1616 boolean_t request_compressed = 1617 (srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) && 1618 !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) && 1619 !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp)); 1620 1621 zio_flag_t zioflags = ZIO_FLAG_CANFAIL; 1622 1623 if (srta->featureflags & DMU_BACKUP_FEATURE_RAW) { 1624 zioflags |= ZIO_FLAG_RAW; 1625 srdp->io_compressed = B_TRUE; 1626 } else if (request_compressed) { 1627 zioflags |= ZIO_FLAG_RAW_COMPRESS; 1628 srdp->io_compressed = B_TRUE; 1629 } 1630 1631 srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ? 1632 BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp); 1633 1634 if (!srta->issue_reads) 1635 return; 1636 if (BP_IS_REDACTED(bp)) 1637 return; 1638 if (send_do_embed(bp, srta->featureflags)) 1639 return; 1640 1641 zbookmark_phys_t zb = { 1642 .zb_objset = dmu_objset_id(os), 1643 .zb_object = range->object, 1644 .zb_level = 0, 1645 .zb_blkid = range->start_blkid, 1646 }; 1647 1648 arc_flags_t aflags = ARC_FLAG_CACHED_ONLY; 1649 1650 int arc_err = arc_read(NULL, os->os_spa, bp, 1651 arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ, 1652 zioflags, &aflags, &zb); 1653 /* 1654 * If the data is not already cached in the ARC, we read directly 1655 * from zio. This avoids the performance overhead of adding a new 1656 * entry to the ARC, and we also avoid polluting the ARC cache with 1657 * data that is not likely to be used in the future. 1658 */ 1659 if (arc_err != 0) { 1660 srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE); 1661 srdp->io_outstanding = B_TRUE; 1662 zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd, 1663 srdp->datasz, dmu_send_read_done, range, 1664 ZIO_PRIORITY_ASYNC_READ, zioflags, &zb)); 1665 } 1666 } 1667 1668 /* 1669 * Create a new record with the given values. 1670 */ 1671 static void 1672 enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn, 1673 uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz) 1674 { 1675 enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE : 1676 (BP_IS_REDACTED(bp) ? REDACT : DATA)); 1677 1678 struct send_range *range = range_alloc(range_type, dn->dn_object, 1679 blkid, blkid + count, B_FALSE); 1680 1681 if (blkid == DMU_SPILL_BLKID) { 1682 ASSERT3P(bp, !=, NULL); 1683 ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA); 1684 } 1685 1686 switch (range_type) { 1687 case HOLE: 1688 range->sru.hole.datablksz = datablksz; 1689 break; 1690 case DATA: 1691 ASSERT3U(count, ==, 1); 1692 range->sru.data.datablksz = datablksz; 1693 range->sru.data.obj_type = dn->dn_type; 1694 range->sru.data.bp = *bp; 1695 issue_data_read(srta, range); 1696 break; 1697 case REDACT: 1698 range->sru.redact.datablksz = datablksz; 1699 break; 1700 default: 1701 break; 1702 } 1703 bqueue_enqueue(q, range, datablksz); 1704 } 1705 1706 /* 1707 * Send DRR_SPILL records for unmodified spill blocks. This is useful 1708 * because changing certain attributes of the object (e.g. blocksize) 1709 * can cause old versions of ZFS to incorrectly remove a spill block. 1710 * Including these records in the stream forces an up to date version 1711 * to always be written ensuring they're never lost. Current versions 1712 * of the code which understand the DRR_FLAG_SPILL_BLOCK feature can 1713 * ignore these unmodified spill blocks. 1714 * 1715 * We piggyback the spill_range to dnode range instead of enqueueing it 1716 * so send_range_after won't complain. 1717 */ 1718 static uint64_t 1719 piggyback_unmodified_spill(struct send_reader_thread_arg *srta, 1720 struct send_range *range) 1721 { 1722 ASSERT3U(range->type, ==, OBJECT); 1723 1724 dnode_phys_t *dnp = range->sru.object.dnp; 1725 uint64_t fromtxg = srta->smta->to_arg->fromtxg; 1726 1727 if (!zfs_send_unmodified_spill_blocks || 1728 !(dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) || 1729 !(BP_GET_LOGICAL_BIRTH(DN_SPILL_BLKPTR(dnp)) <= fromtxg)) 1730 return (0); 1731 1732 blkptr_t *bp = DN_SPILL_BLKPTR(dnp); 1733 struct send_range *spill_range = range_alloc(DATA, range->object, 1734 DMU_SPILL_BLKID, DMU_SPILL_BLKID+1, B_FALSE); 1735 spill_range->sru.data.bp = *bp; 1736 spill_range->sru.data.obj_type = dnp->dn_type; 1737 spill_range->sru.data.datablksz = BP_GET_LSIZE(bp); 1738 1739 issue_data_read(srta, spill_range); 1740 range->sru.object.spill_range = spill_range; 1741 1742 return (BP_GET_LSIZE(bp)); 1743 } 1744 1745 /* 1746 * This thread is responsible for two things: First, it retrieves the correct 1747 * blkptr in the to ds if we need to send the data because of something from 1748 * the from thread. As a result of this, we're the first ones to discover that 1749 * some indirect blocks can be discarded because they're not holes. Second, 1750 * it issues prefetches for the data we need to send. 1751 */ 1752 static __attribute__((noreturn)) void 1753 send_reader_thread(void *arg) 1754 { 1755 struct send_reader_thread_arg *srta = arg; 1756 struct send_merge_thread_arg *smta = srta->smta; 1757 bqueue_t *inq = &smta->q; 1758 bqueue_t *outq = &srta->q; 1759 objset_t *os = smta->os; 1760 fstrans_cookie_t cookie = spl_fstrans_mark(); 1761 struct send_range *range = bqueue_dequeue(inq); 1762 int err = 0; 1763 1764 /* 1765 * If the record we're analyzing is from a redaction bookmark from the 1766 * fromds, then we need to know whether or not it exists in the tods so 1767 * we know whether to create records for it or not. If it does, we need 1768 * the datablksz so we can generate an appropriate record for it. 1769 * Finally, if it isn't redacted, we need the blkptr so that we can send 1770 * a WRITE record containing the actual data. 1771 */ 1772 uint64_t last_obj = UINT64_MAX; 1773 uint64_t last_obj_exists = B_TRUE; 1774 while (!range->eos_marker && !srta->cancel && smta->error == 0 && 1775 err == 0) { 1776 uint64_t spill = 0; 1777 switch (range->type) { 1778 case DATA: 1779 issue_data_read(srta, range); 1780 bqueue_enqueue(outq, range, range->sru.data.datablksz); 1781 range = get_next_range_nofree(inq, range); 1782 break; 1783 case OBJECT: 1784 spill = piggyback_unmodified_spill(srta, range); 1785 zfs_fallthrough; 1786 case HOLE: 1787 case OBJECT_RANGE: 1788 case REDACT: // Redacted blocks must exist 1789 bqueue_enqueue(outq, range, sizeof (*range) + spill); 1790 range = get_next_range_nofree(inq, range); 1791 break; 1792 case PREVIOUSLY_REDACTED: { 1793 /* 1794 * This entry came from the "from bookmark" when 1795 * sending from a bookmark that has a redaction 1796 * list. We need to check if this object/blkid 1797 * exists in the target ("to") dataset, and if 1798 * not then we drop this entry. We also need 1799 * to fill in the block pointer so that we know 1800 * what to prefetch. 1801 * 1802 * To accomplish the above, we first cache whether or 1803 * not the last object we examined exists. If it 1804 * doesn't, we can drop this record. If it does, we hold 1805 * the dnode and use it to call dbuf_dnode_findbp. We do 1806 * this instead of dbuf_bookmark_findbp because we will 1807 * often operate on large ranges, and holding the dnode 1808 * once is more efficient. 1809 */ 1810 boolean_t object_exists = B_TRUE; 1811 /* 1812 * If the data is redacted, we only care if it exists, 1813 * so that we don't send records for objects that have 1814 * been deleted. 1815 */ 1816 dnode_t *dn; 1817 if (range->object == last_obj && !last_obj_exists) { 1818 /* 1819 * If we're still examining the same object as 1820 * previously, and it doesn't exist, we don't 1821 * need to call dbuf_bookmark_findbp. 1822 */ 1823 object_exists = B_FALSE; 1824 } else { 1825 err = dnode_hold(os, range->object, FTAG, &dn); 1826 if (err == ENOENT) { 1827 object_exists = B_FALSE; 1828 err = 0; 1829 } 1830 last_obj = range->object; 1831 last_obj_exists = object_exists; 1832 } 1833 1834 if (err != 0) { 1835 break; 1836 } else if (!object_exists) { 1837 /* 1838 * The block was modified, but doesn't 1839 * exist in the to dataset; if it was 1840 * deleted in the to dataset, then we'll 1841 * visit the hole bp for it at some point. 1842 */ 1843 range = get_next_range(inq, range); 1844 continue; 1845 } 1846 uint64_t file_max = 1847 MIN(dn->dn_maxblkid, range->end_blkid); 1848 /* 1849 * The object exists, so we need to try to find the 1850 * blkptr for each block in the range we're processing. 1851 */ 1852 rw_enter(&dn->dn_struct_rwlock, RW_READER); 1853 for (uint64_t blkid = range->start_blkid; 1854 blkid < file_max; blkid++) { 1855 blkptr_t bp; 1856 uint32_t datablksz = 1857 dn->dn_phys->dn_datablkszsec << 1858 SPA_MINBLOCKSHIFT; 1859 uint64_t offset = blkid * datablksz; 1860 /* 1861 * This call finds the next non-hole block in 1862 * the object. This is to prevent a 1863 * performance problem where we're unredacting 1864 * a large hole. Using dnode_next_offset to 1865 * skip over the large hole avoids iterating 1866 * over every block in it. 1867 */ 1868 err = dnode_next_offset(dn, DNODE_FIND_HAVELOCK, 1869 &offset, 1, 1, 0); 1870 if (err == ESRCH) { 1871 offset = UINT64_MAX; 1872 err = 0; 1873 } else if (err != 0) { 1874 break; 1875 } 1876 if (offset != blkid * datablksz) { 1877 /* 1878 * if there is a hole from here 1879 * (blkid) to offset 1880 */ 1881 offset = MIN(offset, file_max * 1882 datablksz); 1883 uint64_t nblks = (offset / datablksz) - 1884 blkid; 1885 enqueue_range(srta, outq, dn, blkid, 1886 nblks, NULL, datablksz); 1887 blkid += nblks; 1888 } 1889 if (blkid >= file_max) 1890 break; 1891 err = dbuf_dnode_findbp(dn, 0, blkid, &bp, 1892 NULL, NULL); 1893 if (err != 0) 1894 break; 1895 ASSERT(!BP_IS_HOLE(&bp)); 1896 enqueue_range(srta, outq, dn, blkid, 1, &bp, 1897 datablksz); 1898 } 1899 rw_exit(&dn->dn_struct_rwlock); 1900 dnode_rele(dn, FTAG); 1901 range = get_next_range(inq, range); 1902 } 1903 } 1904 } 1905 if (srta->cancel || err != 0) { 1906 smta->cancel = B_TRUE; 1907 srta->error = err; 1908 } else if (smta->error != 0) { 1909 srta->error = smta->error; 1910 } 1911 while (!range->eos_marker) 1912 range = get_next_range(inq, range); 1913 1914 bqueue_enqueue_flush(outq, range, 1); 1915 spl_fstrans_unmark(cookie); 1916 thread_exit(); 1917 } 1918 1919 #define NUM_SNAPS_NOT_REDACTED UINT64_MAX 1920 1921 struct dmu_send_params { 1922 /* Pool args */ 1923 const void *tag; // Tag dp was held with, will be used to release dp. 1924 dsl_pool_t *dp; 1925 /* To snapshot args */ 1926 const char *tosnap; 1927 dsl_dataset_t *to_ds; 1928 /* From snapshot args */ 1929 zfs_bookmark_phys_t ancestor_zb; 1930 uint64_t *fromredactsnaps; 1931 /* NUM_SNAPS_NOT_REDACTED if not sending from redaction bookmark */ 1932 uint64_t numfromredactsnaps; 1933 /* Stream params */ 1934 boolean_t is_clone; 1935 boolean_t embedok; 1936 boolean_t large_block_ok; 1937 boolean_t compressok; 1938 boolean_t rawok; 1939 boolean_t savedok; 1940 uint64_t resumeobj; 1941 uint64_t resumeoff; 1942 uint64_t saved_guid; 1943 zfs_bookmark_phys_t *redactbook; 1944 /* Stream output params */ 1945 dmu_send_outparams_t *dso; 1946 1947 /* Stream progress params */ 1948 offset_t *off; 1949 int outfd; 1950 char saved_toname[MAXNAMELEN]; 1951 }; 1952 1953 static int 1954 setup_featureflags(struct dmu_send_params *dspp, objset_t *os, 1955 uint64_t *featureflags) 1956 { 1957 dsl_dataset_t *to_ds = dspp->to_ds; 1958 dsl_pool_t *dp = dspp->dp; 1959 1960 if (dmu_objset_type(os) == DMU_OST_ZFS) { 1961 uint64_t version; 1962 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) 1963 return (SET_ERROR(EINVAL)); 1964 1965 if (version >= ZPL_VERSION_SA) 1966 *featureflags |= DMU_BACKUP_FEATURE_SA_SPILL; 1967 } 1968 1969 /* raw sends imply large_block_ok */ 1970 if ((dspp->rawok || dspp->large_block_ok) && 1971 dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_BLOCKS)) { 1972 *featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS; 1973 } 1974 1975 /* encrypted datasets will not have embedded blocks */ 1976 if ((dspp->embedok || dspp->rawok) && !os->os_encrypted && 1977 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) { 1978 *featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA; 1979 } 1980 1981 /* raw send implies compressok */ 1982 if (dspp->compressok || dspp->rawok) 1983 *featureflags |= DMU_BACKUP_FEATURE_COMPRESSED; 1984 1985 if (dspp->rawok && os->os_encrypted) 1986 *featureflags |= DMU_BACKUP_FEATURE_RAW; 1987 1988 if ((*featureflags & 1989 (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED | 1990 DMU_BACKUP_FEATURE_RAW)) != 0 && 1991 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) { 1992 *featureflags |= DMU_BACKUP_FEATURE_LZ4; 1993 } 1994 1995 /* 1996 * We specifically do not include DMU_BACKUP_FEATURE_EMBED_DATA here to 1997 * allow sending ZSTD compressed datasets to a receiver that does not 1998 * support ZSTD 1999 */ 2000 if ((*featureflags & 2001 (DMU_BACKUP_FEATURE_COMPRESSED | DMU_BACKUP_FEATURE_RAW)) != 0 && 2002 dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_ZSTD_COMPRESS)) { 2003 *featureflags |= DMU_BACKUP_FEATURE_ZSTD; 2004 } 2005 2006 if (dspp->resumeobj != 0 || dspp->resumeoff != 0) { 2007 *featureflags |= DMU_BACKUP_FEATURE_RESUMING; 2008 } 2009 2010 if (dspp->redactbook != NULL) { 2011 *featureflags |= DMU_BACKUP_FEATURE_REDACTED; 2012 } 2013 2014 if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_DNODE)) { 2015 *featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE; 2016 } 2017 2018 if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LONGNAME)) { 2019 *featureflags |= DMU_BACKUP_FEATURE_LONGNAME; 2020 } 2021 2022 if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_MICROZAP)) { 2023 /* 2024 * We must never split a large microzap block, so we can only 2025 * send large microzaps if LARGE_BLOCKS is already enabled. 2026 */ 2027 if (!(*featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS)) 2028 return (SET_ERROR(ZFS_ERR_STREAM_LARGE_MICROZAP)); 2029 *featureflags |= DMU_BACKUP_FEATURE_LARGE_MICROZAP; 2030 } 2031 2032 return (0); 2033 } 2034 2035 static dmu_replay_record_t * 2036 create_begin_record(struct dmu_send_params *dspp, objset_t *os, 2037 uint64_t featureflags) 2038 { 2039 dmu_replay_record_t *drr = kmem_zalloc(sizeof (dmu_replay_record_t), 2040 KM_SLEEP); 2041 drr->drr_type = DRR_BEGIN; 2042 2043 struct drr_begin *drrb = &drr->drr_u.drr_begin; 2044 dsl_dataset_t *to_ds = dspp->to_ds; 2045 2046 drrb->drr_magic = DMU_BACKUP_MAGIC; 2047 drrb->drr_creation_time = dsl_dataset_phys(to_ds)->ds_creation_time; 2048 drrb->drr_type = dmu_objset_type(os); 2049 drrb->drr_toguid = dsl_dataset_phys(to_ds)->ds_guid; 2050 drrb->drr_fromguid = dspp->ancestor_zb.zbm_guid; 2051 2052 DMU_SET_STREAM_HDRTYPE(drrb->drr_versioninfo, DMU_SUBSTREAM); 2053 DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, featureflags); 2054 2055 if (dspp->is_clone) 2056 drrb->drr_flags |= DRR_FLAG_CLONE; 2057 if (dsl_dataset_phys(dspp->to_ds)->ds_flags & DS_FLAG_CI_DATASET) 2058 drrb->drr_flags |= DRR_FLAG_CI_DATA; 2059 if (zfs_send_set_freerecords_bit) 2060 drrb->drr_flags |= DRR_FLAG_FREERECORDS; 2061 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK; 2062 2063 if (dspp->savedok) { 2064 drrb->drr_toguid = dspp->saved_guid; 2065 strlcpy(drrb->drr_toname, dspp->saved_toname, 2066 sizeof (drrb->drr_toname)); 2067 } else { 2068 dsl_dataset_name(to_ds, drrb->drr_toname); 2069 if (!to_ds->ds_is_snapshot) { 2070 (void) strlcat(drrb->drr_toname, "@--head--", 2071 sizeof (drrb->drr_toname)); 2072 } 2073 } 2074 return (drr); 2075 } 2076 2077 static void 2078 setup_to_thread(struct send_thread_arg *to_arg, objset_t *to_os, 2079 dmu_sendstatus_t *dssp, uint64_t fromtxg, boolean_t rawok) 2080 { 2081 VERIFY0(bqueue_init(&to_arg->q, zfs_send_no_prefetch_queue_ff, 2082 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize), 2083 offsetof(struct send_range, ln))); 2084 to_arg->error_code = 0; 2085 to_arg->cancel = B_FALSE; 2086 to_arg->os = to_os; 2087 to_arg->fromtxg = fromtxg; 2088 to_arg->flags = TRAVERSE_PRE | TRAVERSE_PREFETCH_METADATA; 2089 if (rawok) 2090 to_arg->flags |= TRAVERSE_NO_DECRYPT; 2091 if (zfs_send_corrupt_data) 2092 to_arg->flags |= TRAVERSE_HARD; 2093 to_arg->num_blocks_visited = &dssp->dss_blocks; 2094 (void) thread_create(NULL, 0, send_traverse_thread, to_arg, 0, 2095 curproc, TS_RUN, minclsyspri); 2096 } 2097 2098 static void 2099 setup_from_thread(struct redact_list_thread_arg *from_arg, 2100 redaction_list_t *from_rl, dmu_sendstatus_t *dssp) 2101 { 2102 VERIFY0(bqueue_init(&from_arg->q, zfs_send_no_prefetch_queue_ff, 2103 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize), 2104 offsetof(struct send_range, ln))); 2105 from_arg->error_code = 0; 2106 from_arg->cancel = B_FALSE; 2107 from_arg->rl = from_rl; 2108 from_arg->mark_redact = B_FALSE; 2109 from_arg->num_blocks_visited = &dssp->dss_blocks; 2110 /* 2111 * If from_ds is null, send_traverse_thread just returns success and 2112 * enqueues an eos marker. 2113 */ 2114 (void) thread_create(NULL, 0, redact_list_thread, from_arg, 0, 2115 curproc, TS_RUN, minclsyspri); 2116 } 2117 2118 static void 2119 setup_redact_list_thread(struct redact_list_thread_arg *rlt_arg, 2120 struct dmu_send_params *dspp, redaction_list_t *rl, dmu_sendstatus_t *dssp) 2121 { 2122 if (dspp->redactbook == NULL) 2123 return; 2124 2125 rlt_arg->cancel = B_FALSE; 2126 VERIFY0(bqueue_init(&rlt_arg->q, zfs_send_no_prefetch_queue_ff, 2127 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize), 2128 offsetof(struct send_range, ln))); 2129 rlt_arg->error_code = 0; 2130 rlt_arg->mark_redact = B_TRUE; 2131 rlt_arg->rl = rl; 2132 rlt_arg->num_blocks_visited = &dssp->dss_blocks; 2133 2134 (void) thread_create(NULL, 0, redact_list_thread, rlt_arg, 0, 2135 curproc, TS_RUN, minclsyspri); 2136 } 2137 2138 static void 2139 setup_merge_thread(struct send_merge_thread_arg *smt_arg, 2140 struct dmu_send_params *dspp, struct redact_list_thread_arg *from_arg, 2141 struct send_thread_arg *to_arg, struct redact_list_thread_arg *rlt_arg, 2142 objset_t *os) 2143 { 2144 VERIFY0(bqueue_init(&smt_arg->q, zfs_send_no_prefetch_queue_ff, 2145 MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize), 2146 offsetof(struct send_range, ln))); 2147 smt_arg->cancel = B_FALSE; 2148 smt_arg->error = 0; 2149 smt_arg->from_arg = from_arg; 2150 smt_arg->to_arg = to_arg; 2151 if (dspp->redactbook != NULL) 2152 smt_arg->redact_arg = rlt_arg; 2153 2154 smt_arg->os = os; 2155 (void) thread_create(NULL, 0, send_merge_thread, smt_arg, 0, curproc, 2156 TS_RUN, minclsyspri); 2157 } 2158 2159 static void 2160 setup_reader_thread(struct send_reader_thread_arg *srt_arg, 2161 struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg, 2162 uint64_t featureflags) 2163 { 2164 VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff, 2165 MAX(zfs_send_queue_length, 2 * zfs_max_recordsize), 2166 offsetof(struct send_range, ln))); 2167 srt_arg->smta = smt_arg; 2168 srt_arg->issue_reads = !dspp->dso->dso_dryrun; 2169 srt_arg->featureflags = featureflags; 2170 (void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0, 2171 curproc, TS_RUN, minclsyspri); 2172 } 2173 2174 static int 2175 setup_resume_points(struct dmu_send_params *dspp, 2176 struct send_thread_arg *to_arg, struct redact_list_thread_arg *from_arg, 2177 struct redact_list_thread_arg *rlt_arg, 2178 struct send_merge_thread_arg *smt_arg, boolean_t resuming, objset_t *os, 2179 redaction_list_t *redact_rl, nvlist_t *nvl) 2180 { 2181 (void) smt_arg; 2182 dsl_dataset_t *to_ds = dspp->to_ds; 2183 int err = 0; 2184 2185 uint64_t obj = 0; 2186 uint64_t blkid = 0; 2187 if (resuming) { 2188 obj = dspp->resumeobj; 2189 dmu_object_info_t to_doi; 2190 err = dmu_object_info(os, obj, &to_doi); 2191 if (err != 0) 2192 return (err); 2193 2194 blkid = dspp->resumeoff / to_doi.doi_data_block_size; 2195 } 2196 /* 2197 * If we're resuming a redacted send, we can skip to the appropriate 2198 * point in the redaction bookmark by binary searching through it. 2199 */ 2200 if (redact_rl != NULL) { 2201 SET_BOOKMARK(&rlt_arg->resume, to_ds->ds_object, obj, 0, blkid); 2202 } 2203 2204 SET_BOOKMARK(&to_arg->resume, to_ds->ds_object, obj, 0, blkid); 2205 if (nvlist_exists(nvl, BEGINNV_REDACT_FROM_SNAPS)) { 2206 uint64_t objset = dspp->ancestor_zb.zbm_redaction_obj; 2207 /* 2208 * Note: If the resume point is in an object whose 2209 * blocksize is different in the from vs to snapshots, 2210 * we will have divided by the "wrong" blocksize. 2211 * However, in this case fromsnap's send_cb() will 2212 * detect that the blocksize has changed and therefore 2213 * ignore this object. 2214 * 2215 * If we're resuming a send from a redaction bookmark, 2216 * we still cannot accidentally suggest blocks behind 2217 * the to_ds. In addition, we know that any blocks in 2218 * the object in the to_ds will have to be sent, since 2219 * the size changed. Therefore, we can't cause any harm 2220 * this way either. 2221 */ 2222 SET_BOOKMARK(&from_arg->resume, objset, obj, 0, blkid); 2223 } 2224 if (resuming) { 2225 fnvlist_add_uint64(nvl, BEGINNV_RESUME_OBJECT, dspp->resumeobj); 2226 fnvlist_add_uint64(nvl, BEGINNV_RESUME_OFFSET, dspp->resumeoff); 2227 } 2228 return (0); 2229 } 2230 2231 static dmu_sendstatus_t * 2232 setup_send_progress(struct dmu_send_params *dspp) 2233 { 2234 dmu_sendstatus_t *dssp = kmem_zalloc(sizeof (*dssp), KM_SLEEP); 2235 dssp->dss_outfd = dspp->outfd; 2236 dssp->dss_off = dspp->off; 2237 dssp->dss_proc = curproc; 2238 mutex_enter(&dspp->to_ds->ds_sendstream_lock); 2239 list_insert_head(&dspp->to_ds->ds_sendstreams, dssp); 2240 mutex_exit(&dspp->to_ds->ds_sendstream_lock); 2241 return (dssp); 2242 } 2243 2244 /* 2245 * Actually do the bulk of the work in a zfs send. 2246 * 2247 * The idea is that we want to do a send from ancestor_zb to to_ds. We also 2248 * want to not send any data that has been modified by all the datasets in 2249 * redactsnaparr, and store the list of blocks that are redacted in this way in 2250 * a bookmark named redactbook, created on the to_ds. We do this by creating 2251 * several worker threads, whose function is described below. 2252 * 2253 * There are three cases. 2254 * The first case is a redacted zfs send. In this case there are 5 threads. 2255 * The first thread is the to_ds traversal thread: it calls dataset_traverse on 2256 * the to_ds and finds all the blocks that have changed since ancestor_zb (if 2257 * it's a full send, that's all blocks in the dataset). It then sends those 2258 * blocks on to the send merge thread. The redact list thread takes the data 2259 * from the redaction bookmark and sends those blocks on to the send merge 2260 * thread. The send merge thread takes the data from the to_ds traversal 2261 * thread, and combines it with the redaction records from the redact list 2262 * thread. If a block appears in both the to_ds's data and the redaction data, 2263 * the send merge thread will mark it as redacted and send it on to the prefetch 2264 * thread. Otherwise, the send merge thread will send the block on to the 2265 * prefetch thread unchanged. The prefetch thread will issue prefetch reads for 2266 * any data that isn't redacted, and then send the data on to the main thread. 2267 * The main thread behaves the same as in a normal send case, issuing demand 2268 * reads for data blocks and sending out records over the network 2269 * 2270 * The graphic below diagrams the flow of data in the case of a redacted zfs 2271 * send. Each box represents a thread, and each line represents the flow of 2272 * data. 2273 * 2274 * Records from the | 2275 * redaction bookmark | 2276 * +--------------------+ | +---------------------------+ 2277 * | | v | Send Merge Thread | 2278 * | Redact List Thread +----------> Apply redaction marks to | 2279 * | | | records as specified by | 2280 * +--------------------+ | redaction ranges | 2281 * +----^---------------+------+ 2282 * | | Merged data 2283 * | | 2284 * | +------------v--------+ 2285 * | | Prefetch Thread | 2286 * +--------------------+ | | Issues prefetch | 2287 * | to_ds Traversal | | | reads of data blocks| 2288 * | Thread (finds +---------------+ +------------+--------+ 2289 * | candidate blocks) | Blocks modified | Prefetched data 2290 * +--------------------+ by to_ds since | 2291 * ancestor_zb +------------v----+ 2292 * | Main Thread | File Descriptor 2293 * | Sends data over +->(to zfs receive) 2294 * | wire | 2295 * +-----------------+ 2296 * 2297 * The second case is an incremental send from a redaction bookmark. The to_ds 2298 * traversal thread and the main thread behave the same as in the redacted 2299 * send case. The new thread is the from bookmark traversal thread. It 2300 * iterates over the redaction list in the redaction bookmark, and enqueues 2301 * records for each block that was redacted in the original send. The send 2302 * merge thread now has to merge the data from the two threads. For details 2303 * about that process, see the header comment of send_merge_thread(). Any data 2304 * it decides to send on will be prefetched by the prefetch thread. Note that 2305 * you can perform a redacted send from a redaction bookmark; in that case, 2306 * the data flow behaves very similarly to the flow in the redacted send case, 2307 * except with the addition of the bookmark traversal thread iterating over the 2308 * redaction bookmark. The send_merge_thread also has to take on the 2309 * responsibility of merging the redact list thread's records, the bookmark 2310 * traversal thread's records, and the to_ds records. 2311 * 2312 * +---------------------+ 2313 * | | 2314 * | Redact List Thread +--------------+ 2315 * | | | 2316 * +---------------------+ | 2317 * Blocks in redaction list | Ranges modified by every secure snap 2318 * of from bookmark | (or EOS if not readcted) 2319 * | 2320 * +---------------------+ | +----v----------------------+ 2321 * | bookmark Traversal | v | Send Merge Thread | 2322 * | Thread (finds +---------> Merges bookmark, rlt, and | 2323 * | candidate blocks) | | to_ds send records | 2324 * +---------------------+ +----^---------------+------+ 2325 * | | Merged data 2326 * | +------------v--------+ 2327 * | | Prefetch Thread | 2328 * +--------------------+ | | Issues prefetch | 2329 * | to_ds Traversal | | | reads of data blocks| 2330 * | Thread (finds +---------------+ +------------+--------+ 2331 * | candidate blocks) | Blocks modified | Prefetched data 2332 * +--------------------+ by to_ds since +------------v----+ 2333 * ancestor_zb | Main Thread | File Descriptor 2334 * | Sends data over +->(to zfs receive) 2335 * | wire | 2336 * +-----------------+ 2337 * 2338 * The final case is a simple zfs full or incremental send. The to_ds traversal 2339 * thread behaves the same as always. The redact list thread is never started. 2340 * The send merge thread takes all the blocks that the to_ds traversal thread 2341 * sends it, prefetches the data, and sends the blocks on to the main thread. 2342 * The main thread sends the data over the wire. 2343 * 2344 * To keep performance acceptable, we want to prefetch the data in the worker 2345 * threads. While the to_ds thread could simply use the TRAVERSE_PREFETCH 2346 * feature built into traverse_dataset, the combining and deletion of records 2347 * due to redaction and sends from redaction bookmarks mean that we could 2348 * issue many unnecessary prefetches. As a result, we only prefetch data 2349 * after we've determined that the record is not going to be redacted. To 2350 * prevent the prefetching from getting too far ahead of the main thread, the 2351 * blocking queues that are used for communication are capped not by the 2352 * number of entries in the queue, but by the sum of the size of the 2353 * prefetches associated with them. The limit on the amount of data that the 2354 * thread can prefetch beyond what the main thread has reached is controlled 2355 * by the global variable zfs_send_queue_length. In addition, to prevent poor 2356 * performance in the beginning of a send, we also limit the distance ahead 2357 * that the traversal threads can be. That distance is controlled by the 2358 * zfs_send_no_prefetch_queue_length tunable. 2359 * 2360 * Note: Releases dp using the specified tag. 2361 */ 2362 static int 2363 dmu_send_impl(struct dmu_send_params *dspp) 2364 { 2365 objset_t *os; 2366 dmu_replay_record_t *drr; 2367 dmu_sendstatus_t *dssp; 2368 dmu_send_cookie_t dsc = {0}; 2369 int err; 2370 uint64_t fromtxg = dspp->ancestor_zb.zbm_creation_txg; 2371 uint64_t featureflags = 0; 2372 struct redact_list_thread_arg *from_arg; 2373 struct send_thread_arg *to_arg; 2374 struct redact_list_thread_arg *rlt_arg; 2375 struct send_merge_thread_arg *smt_arg; 2376 struct send_reader_thread_arg *srt_arg; 2377 struct send_range *range; 2378 redaction_list_t *from_rl = NULL; 2379 redaction_list_t *redact_rl = NULL; 2380 boolean_t resuming = (dspp->resumeobj != 0 || dspp->resumeoff != 0); 2381 boolean_t book_resuming = resuming; 2382 2383 dsl_dataset_t *to_ds = dspp->to_ds; 2384 zfs_bookmark_phys_t *ancestor_zb = &dspp->ancestor_zb; 2385 dsl_pool_t *dp = dspp->dp; 2386 const void *tag = dspp->tag; 2387 2388 err = dmu_objset_from_ds(to_ds, &os); 2389 if (err != 0) { 2390 dsl_pool_rele(dp, tag); 2391 return (err); 2392 } 2393 2394 /* 2395 * If this is a non-raw send of an encrypted ds, we can ensure that 2396 * the objset_phys_t is authenticated. This is safe because this is 2397 * either a snapshot or we have owned the dataset, ensuring that 2398 * it can't be modified. 2399 */ 2400 if (!dspp->rawok && os->os_encrypted && 2401 arc_is_unauthenticated(os->os_phys_buf)) { 2402 zbookmark_phys_t zb; 2403 2404 SET_BOOKMARK(&zb, to_ds->ds_object, ZB_ROOT_OBJECT, 2405 ZB_ROOT_LEVEL, ZB_ROOT_BLKID); 2406 err = arc_untransform(os->os_phys_buf, os->os_spa, 2407 &zb, B_FALSE); 2408 if (err != 0) { 2409 dsl_pool_rele(dp, tag); 2410 return (err); 2411 } 2412 2413 ASSERT0(arc_is_unauthenticated(os->os_phys_buf)); 2414 } 2415 2416 if ((err = setup_featureflags(dspp, os, &featureflags)) != 0) { 2417 dsl_pool_rele(dp, tag); 2418 return (err); 2419 } 2420 2421 /* 2422 * If we're doing a redacted send, hold the bookmark's redaction list. 2423 */ 2424 if (dspp->redactbook != NULL) { 2425 err = dsl_redaction_list_hold_obj(dp, 2426 dspp->redactbook->zbm_redaction_obj, FTAG, 2427 &redact_rl); 2428 if (err != 0) { 2429 dsl_pool_rele(dp, tag); 2430 return (SET_ERROR(EINVAL)); 2431 } 2432 dsl_redaction_list_long_hold(dp, redact_rl, FTAG); 2433 } 2434 2435 /* 2436 * If we're sending from a redaction bookmark, hold the redaction list 2437 * so that we can consider sending the redacted blocks. 2438 */ 2439 if (ancestor_zb->zbm_redaction_obj != 0) { 2440 err = dsl_redaction_list_hold_obj(dp, 2441 ancestor_zb->zbm_redaction_obj, FTAG, &from_rl); 2442 if (err != 0) { 2443 if (redact_rl != NULL) { 2444 dsl_redaction_list_long_rele(redact_rl, FTAG); 2445 dsl_redaction_list_rele(redact_rl, FTAG); 2446 } 2447 dsl_pool_rele(dp, tag); 2448 return (SET_ERROR(EINVAL)); 2449 } 2450 dsl_redaction_list_long_hold(dp, from_rl, FTAG); 2451 } 2452 2453 dsl_dataset_long_hold(to_ds, FTAG); 2454 2455 from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP); 2456 to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP); 2457 rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP); 2458 smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP); 2459 srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP); 2460 2461 drr = create_begin_record(dspp, os, featureflags); 2462 dssp = setup_send_progress(dspp); 2463 2464 dsc.dsc_drr = drr; 2465 dsc.dsc_dso = dspp->dso; 2466 dsc.dsc_os = os; 2467 dsc.dsc_off = dspp->off; 2468 dsc.dsc_toguid = dsl_dataset_phys(to_ds)->ds_guid; 2469 dsc.dsc_fromtxg = fromtxg; 2470 dsc.dsc_pending_op = PENDING_NONE; 2471 dsc.dsc_featureflags = featureflags; 2472 dsc.dsc_resume_object = dspp->resumeobj; 2473 dsc.dsc_resume_offset = dspp->resumeoff; 2474 2475 dsl_pool_rele(dp, tag); 2476 2477 void *payload = NULL; 2478 size_t payload_len = 0; 2479 nvlist_t *nvl = fnvlist_alloc(); 2480 2481 /* 2482 * If we're doing a redacted send, we include the snapshots we're 2483 * redacted with respect to so that the target system knows what send 2484 * streams can be correctly received on top of this dataset. If we're 2485 * instead sending a redacted dataset, we include the snapshots that the 2486 * dataset was created with respect to. 2487 */ 2488 if (dspp->redactbook != NULL) { 2489 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS, 2490 redact_rl->rl_phys->rlp_snaps, 2491 redact_rl->rl_phys->rlp_num_snaps); 2492 } else if (dsl_dataset_feature_is_active(to_ds, 2493 SPA_FEATURE_REDACTED_DATASETS)) { 2494 uint64_t *tods_guids; 2495 uint64_t length; 2496 VERIFY(dsl_dataset_get_uint64_array_feature(to_ds, 2497 SPA_FEATURE_REDACTED_DATASETS, &length, &tods_guids)); 2498 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS, tods_guids, 2499 length); 2500 } 2501 2502 /* 2503 * If we're sending from a redaction bookmark, then we should retrieve 2504 * the guids of that bookmark so we can send them over the wire. 2505 */ 2506 if (from_rl != NULL) { 2507 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS, 2508 from_rl->rl_phys->rlp_snaps, 2509 from_rl->rl_phys->rlp_num_snaps); 2510 } 2511 2512 /* 2513 * If the snapshot we're sending from is redacted, include the redaction 2514 * list in the stream. 2515 */ 2516 if (dspp->numfromredactsnaps != NUM_SNAPS_NOT_REDACTED) { 2517 ASSERT0P(from_rl); 2518 fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS, 2519 dspp->fromredactsnaps, (uint_t)dspp->numfromredactsnaps); 2520 if (dspp->numfromredactsnaps > 0) { 2521 kmem_free(dspp->fromredactsnaps, 2522 dspp->numfromredactsnaps * sizeof (uint64_t)); 2523 dspp->fromredactsnaps = NULL; 2524 } 2525 } 2526 2527 if (resuming || book_resuming) { 2528 err = setup_resume_points(dspp, to_arg, from_arg, 2529 rlt_arg, smt_arg, resuming, os, redact_rl, nvl); 2530 if (err != 0) 2531 goto out; 2532 } 2533 2534 if (featureflags & DMU_BACKUP_FEATURE_RAW) { 2535 uint64_t ivset_guid = ancestor_zb->zbm_ivset_guid; 2536 nvlist_t *keynvl = NULL; 2537 ASSERT(os->os_encrypted); 2538 2539 err = dsl_crypto_populate_key_nvlist(os, ivset_guid, 2540 &keynvl); 2541 if (err != 0) { 2542 fnvlist_free(nvl); 2543 goto out; 2544 } 2545 2546 fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl); 2547 fnvlist_free(keynvl); 2548 } 2549 2550 if (!nvlist_empty(nvl)) { 2551 payload = fnvlist_pack(nvl, &payload_len); 2552 drr->drr_payloadlen = payload_len; 2553 } 2554 2555 fnvlist_free(nvl); 2556 err = dump_record(&dsc, payload, payload_len); 2557 fnvlist_pack_free(payload, payload_len); 2558 if (err != 0) { 2559 err = dsc.dsc_err; 2560 goto out; 2561 } 2562 2563 setup_to_thread(to_arg, os, dssp, fromtxg, dspp->rawok); 2564 setup_from_thread(from_arg, from_rl, dssp); 2565 setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp); 2566 setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os); 2567 setup_reader_thread(srt_arg, dspp, smt_arg, featureflags); 2568 2569 range = bqueue_dequeue(&srt_arg->q); 2570 while (err == 0 && !range->eos_marker) { 2571 err = do_dump(&dsc, range); 2572 range = get_next_range(&srt_arg->q, range); 2573 if (issig()) 2574 err = SET_ERROR(EINTR); 2575 } 2576 2577 /* 2578 * If we hit an error or are interrupted, cancel our worker threads and 2579 * clear the queue of any pending records. The threads will pass the 2580 * cancel up the tree of worker threads, and each one will clean up any 2581 * pending records before exiting. 2582 */ 2583 if (err != 0) { 2584 srt_arg->cancel = B_TRUE; 2585 while (!range->eos_marker) { 2586 range = get_next_range(&srt_arg->q, range); 2587 } 2588 } 2589 range_free(range); 2590 2591 bqueue_destroy(&srt_arg->q); 2592 bqueue_destroy(&smt_arg->q); 2593 if (dspp->redactbook != NULL) 2594 bqueue_destroy(&rlt_arg->q); 2595 bqueue_destroy(&to_arg->q); 2596 bqueue_destroy(&from_arg->q); 2597 2598 if (err == 0 && srt_arg->error != 0) 2599 err = srt_arg->error; 2600 2601 if (err != 0) 2602 goto out; 2603 2604 if (dsc.dsc_pending_op != PENDING_NONE) 2605 if (dump_record(&dsc, NULL, 0) != 0) 2606 err = SET_ERROR(EINTR); 2607 2608 if (err != 0) { 2609 if (err == EINTR && dsc.dsc_err != 0) 2610 err = dsc.dsc_err; 2611 goto out; 2612 } 2613 2614 /* 2615 * Send the DRR_END record if this is not a saved stream. 2616 * Otherwise, the omitted DRR_END record will signal to 2617 * the receive side that the stream is incomplete. 2618 */ 2619 if (!dspp->savedok) { 2620 memset(drr, 0, sizeof (dmu_replay_record_t)); 2621 drr->drr_type = DRR_END; 2622 drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc; 2623 drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid; 2624 2625 if (dump_record(&dsc, NULL, 0) != 0) 2626 err = dsc.dsc_err; 2627 } 2628 out: 2629 mutex_enter(&to_ds->ds_sendstream_lock); 2630 list_remove(&to_ds->ds_sendstreams, dssp); 2631 mutex_exit(&to_ds->ds_sendstream_lock); 2632 2633 VERIFY(err != 0 || (dsc.dsc_sent_begin && 2634 (dsc.dsc_sent_end || dspp->savedok))); 2635 2636 kmem_free(drr, sizeof (dmu_replay_record_t)); 2637 kmem_free(dssp, sizeof (dmu_sendstatus_t)); 2638 kmem_free(from_arg, sizeof (*from_arg)); 2639 kmem_free(to_arg, sizeof (*to_arg)); 2640 kmem_free(rlt_arg, sizeof (*rlt_arg)); 2641 kmem_free(smt_arg, sizeof (*smt_arg)); 2642 kmem_free(srt_arg, sizeof (*srt_arg)); 2643 2644 dsl_dataset_long_rele(to_ds, FTAG); 2645 if (from_rl != NULL) { 2646 dsl_redaction_list_long_rele(from_rl, FTAG); 2647 dsl_redaction_list_rele(from_rl, FTAG); 2648 } 2649 if (redact_rl != NULL) { 2650 dsl_redaction_list_long_rele(redact_rl, FTAG); 2651 dsl_redaction_list_rele(redact_rl, FTAG); 2652 } 2653 2654 return (err); 2655 } 2656 2657 int 2658 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap, 2659 boolean_t embedok, boolean_t large_block_ok, boolean_t compressok, 2660 boolean_t rawok, boolean_t savedok, int outfd, offset_t *off, 2661 dmu_send_outparams_t *dsop) 2662 { 2663 int err; 2664 dsl_dataset_t *fromds; 2665 ds_hold_flags_t dsflags; 2666 struct dmu_send_params dspp = {0}; 2667 dspp.embedok = embedok; 2668 dspp.large_block_ok = large_block_ok; 2669 dspp.compressok = compressok; 2670 dspp.outfd = outfd; 2671 dspp.off = off; 2672 dspp.dso = dsop; 2673 dspp.tag = FTAG; 2674 dspp.rawok = rawok; 2675 dspp.savedok = savedok; 2676 2677 dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT; 2678 err = dsl_pool_hold(pool, FTAG, &dspp.dp); 2679 if (err != 0) 2680 return (err); 2681 2682 err = dsl_dataset_hold_obj_flags(dspp.dp, tosnap, dsflags, FTAG, 2683 &dspp.to_ds); 2684 if (err != 0) { 2685 dsl_pool_rele(dspp.dp, FTAG); 2686 return (err); 2687 } 2688 2689 if (fromsnap != 0) { 2690 err = dsl_dataset_hold_obj(dspp.dp, fromsnap, FTAG, &fromds); 2691 2692 if (err != 0) { 2693 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG); 2694 dsl_pool_rele(dspp.dp, FTAG); 2695 return (err); 2696 } 2697 dspp.ancestor_zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 2698 dspp.ancestor_zb.zbm_creation_txg = 2699 dsl_dataset_phys(fromds)->ds_creation_txg; 2700 dspp.ancestor_zb.zbm_creation_time = 2701 dsl_dataset_phys(fromds)->ds_creation_time; 2702 2703 if (dsl_dataset_is_zapified(fromds)) { 2704 (void) zap_lookup(dspp.dp->dp_meta_objset, 2705 fromds->ds_object, DS_FIELD_IVSET_GUID, 8, 1, 2706 &dspp.ancestor_zb.zbm_ivset_guid); 2707 } 2708 2709 /* See dmu_send for the reasons behind this. */ 2710 uint64_t *fromredact; 2711 2712 if (!dsl_dataset_get_uint64_array_feature(fromds, 2713 SPA_FEATURE_REDACTED_DATASETS, 2714 &dspp.numfromredactsnaps, 2715 &fromredact)) { 2716 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED; 2717 } else if (dspp.numfromredactsnaps > 0) { 2718 uint64_t size = dspp.numfromredactsnaps * 2719 sizeof (uint64_t); 2720 dspp.fromredactsnaps = kmem_zalloc(size, KM_SLEEP); 2721 memcpy(dspp.fromredactsnaps, fromredact, size); 2722 } 2723 2724 boolean_t is_before = 2725 dsl_dataset_is_before(dspp.to_ds, fromds, 0); 2726 dspp.is_clone = (dspp.to_ds->ds_dir != 2727 fromds->ds_dir); 2728 dsl_dataset_rele(fromds, FTAG); 2729 if (!is_before) { 2730 dsl_pool_rele(dspp.dp, FTAG); 2731 err = SET_ERROR(EXDEV); 2732 } else { 2733 err = dmu_send_impl(&dspp); 2734 } 2735 } else { 2736 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED; 2737 err = dmu_send_impl(&dspp); 2738 } 2739 if (dspp.fromredactsnaps) 2740 kmem_free(dspp.fromredactsnaps, 2741 dspp.numfromredactsnaps * sizeof (uint64_t)); 2742 2743 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG); 2744 return (err); 2745 } 2746 2747 int 2748 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok, 2749 boolean_t large_block_ok, boolean_t compressok, boolean_t rawok, 2750 boolean_t savedok, uint64_t resumeobj, uint64_t resumeoff, 2751 const char *redactbook, int outfd, offset_t *off, 2752 dmu_send_outparams_t *dsop) 2753 { 2754 int err = 0; 2755 ds_hold_flags_t dsflags; 2756 boolean_t owned = B_FALSE; 2757 dsl_dataset_t *fromds = NULL; 2758 zfs_bookmark_phys_t book = {0}; 2759 struct dmu_send_params dspp = {0}; 2760 2761 dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT; 2762 dspp.tosnap = tosnap; 2763 dspp.embedok = embedok; 2764 dspp.large_block_ok = large_block_ok; 2765 dspp.compressok = compressok; 2766 dspp.outfd = outfd; 2767 dspp.off = off; 2768 dspp.dso = dsop; 2769 dspp.tag = FTAG; 2770 dspp.resumeobj = resumeobj; 2771 dspp.resumeoff = resumeoff; 2772 dspp.rawok = rawok; 2773 dspp.savedok = savedok; 2774 2775 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL) 2776 return (SET_ERROR(EINVAL)); 2777 2778 err = dsl_pool_hold(tosnap, FTAG, &dspp.dp); 2779 if (err != 0) 2780 return (err); 2781 2782 if (strchr(tosnap, '@') == NULL && spa_writeable(dspp.dp->dp_spa)) { 2783 /* 2784 * We are sending a filesystem or volume. Ensure 2785 * that it doesn't change by owning the dataset. 2786 */ 2787 2788 if (savedok) { 2789 /* 2790 * We are looking for the dataset that represents the 2791 * partially received send stream. If this stream was 2792 * received as a new snapshot of an existing dataset, 2793 * this will be saved in a hidden clone named 2794 * "<pool>/<dataset>/%recv". Otherwise, the stream 2795 * will be saved in the live dataset itself. In 2796 * either case we need to use dsl_dataset_own_force() 2797 * because the stream is marked as inconsistent, 2798 * which would normally make it unavailable to be 2799 * owned. 2800 */ 2801 char *name = kmem_asprintf("%s/%s", tosnap, 2802 recv_clone_name); 2803 err = dsl_dataset_own_force(dspp.dp, name, dsflags, 2804 FTAG, &dspp.to_ds); 2805 if (err == ENOENT) { 2806 err = dsl_dataset_own_force(dspp.dp, tosnap, 2807 dsflags, FTAG, &dspp.to_ds); 2808 } 2809 2810 if (err == 0) { 2811 owned = B_TRUE; 2812 err = zap_lookup(dspp.dp->dp_meta_objset, 2813 dspp.to_ds->ds_object, 2814 DS_FIELD_RESUME_TOGUID, 8, 1, 2815 &dspp.saved_guid); 2816 } 2817 2818 if (err == 0) { 2819 err = zap_lookup(dspp.dp->dp_meta_objset, 2820 dspp.to_ds->ds_object, 2821 DS_FIELD_RESUME_TONAME, 1, 2822 sizeof (dspp.saved_toname), 2823 dspp.saved_toname); 2824 } 2825 /* Only disown if there was an error in the lookups */ 2826 if (owned && (err != 0)) 2827 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG); 2828 2829 kmem_strfree(name); 2830 } else { 2831 err = dsl_dataset_own(dspp.dp, tosnap, dsflags, 2832 FTAG, &dspp.to_ds); 2833 if (err == 0) 2834 owned = B_TRUE; 2835 } 2836 } else { 2837 err = dsl_dataset_hold_flags(dspp.dp, tosnap, dsflags, FTAG, 2838 &dspp.to_ds); 2839 } 2840 2841 if (err != 0) { 2842 /* Note: dsl dataset is not owned at this point */ 2843 dsl_pool_rele(dspp.dp, FTAG); 2844 return (err); 2845 } 2846 2847 if (redactbook != NULL) { 2848 char path[ZFS_MAX_DATASET_NAME_LEN]; 2849 (void) strlcpy(path, tosnap, sizeof (path)); 2850 char *at = strchr(path, '@'); 2851 if (at == NULL) { 2852 err = EINVAL; 2853 } else { 2854 (void) snprintf(at, sizeof (path) - (at - path), "#%s", 2855 redactbook); 2856 err = dsl_bookmark_lookup(dspp.dp, path, 2857 NULL, &book); 2858 dspp.redactbook = &book; 2859 } 2860 } 2861 2862 if (err != 0) { 2863 dsl_pool_rele(dspp.dp, FTAG); 2864 if (owned) 2865 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG); 2866 else 2867 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG); 2868 return (err); 2869 } 2870 2871 if (fromsnap != NULL) { 2872 zfs_bookmark_phys_t *zb = &dspp.ancestor_zb; 2873 int fsnamelen; 2874 if (strpbrk(tosnap, "@#") != NULL) 2875 fsnamelen = strpbrk(tosnap, "@#") - tosnap; 2876 else 2877 fsnamelen = strlen(tosnap); 2878 2879 /* 2880 * If the fromsnap is in a different filesystem, then 2881 * mark the send stream as a clone. 2882 */ 2883 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 || 2884 (fromsnap[fsnamelen] != '@' && 2885 fromsnap[fsnamelen] != '#')) { 2886 dspp.is_clone = B_TRUE; 2887 } 2888 2889 if (strchr(fromsnap, '@') != NULL) { 2890 err = dsl_dataset_hold(dspp.dp, fromsnap, FTAG, 2891 &fromds); 2892 2893 if (err != 0) { 2894 ASSERT0P(fromds); 2895 } else { 2896 /* 2897 * We need to make a deep copy of the redact 2898 * snapshots of the from snapshot, because the 2899 * array will be freed when we evict from_ds. 2900 */ 2901 uint64_t *fromredact; 2902 if (!dsl_dataset_get_uint64_array_feature( 2903 fromds, SPA_FEATURE_REDACTED_DATASETS, 2904 &dspp.numfromredactsnaps, 2905 &fromredact)) { 2906 dspp.numfromredactsnaps = 2907 NUM_SNAPS_NOT_REDACTED; 2908 } else if (dspp.numfromredactsnaps > 0) { 2909 uint64_t size = 2910 dspp.numfromredactsnaps * 2911 sizeof (uint64_t); 2912 dspp.fromredactsnaps = kmem_zalloc(size, 2913 KM_SLEEP); 2914 memcpy(dspp.fromredactsnaps, fromredact, 2915 size); 2916 } 2917 if (!dsl_dataset_is_before(dspp.to_ds, fromds, 2918 0)) { 2919 err = SET_ERROR(EXDEV); 2920 } else { 2921 zb->zbm_creation_txg = 2922 dsl_dataset_phys(fromds)-> 2923 ds_creation_txg; 2924 zb->zbm_creation_time = 2925 dsl_dataset_phys(fromds)-> 2926 ds_creation_time; 2927 zb->zbm_guid = 2928 dsl_dataset_phys(fromds)->ds_guid; 2929 zb->zbm_redaction_obj = 0; 2930 2931 if (dsl_dataset_is_zapified(fromds)) { 2932 (void) zap_lookup( 2933 dspp.dp->dp_meta_objset, 2934 fromds->ds_object, 2935 DS_FIELD_IVSET_GUID, 8, 1, 2936 &zb->zbm_ivset_guid); 2937 } 2938 } 2939 dsl_dataset_rele(fromds, FTAG); 2940 } 2941 } else { 2942 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED; 2943 err = dsl_bookmark_lookup(dspp.dp, fromsnap, dspp.to_ds, 2944 zb); 2945 if (err == EXDEV && zb->zbm_redaction_obj != 0 && 2946 zb->zbm_guid == 2947 dsl_dataset_phys(dspp.to_ds)->ds_guid) 2948 err = 0; 2949 } 2950 2951 if (err == 0) { 2952 /* dmu_send_impl will call dsl_pool_rele for us. */ 2953 err = dmu_send_impl(&dspp); 2954 } else { 2955 if (dspp.fromredactsnaps) 2956 kmem_free(dspp.fromredactsnaps, 2957 dspp.numfromredactsnaps * 2958 sizeof (uint64_t)); 2959 dsl_pool_rele(dspp.dp, FTAG); 2960 } 2961 } else { 2962 dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED; 2963 err = dmu_send_impl(&dspp); 2964 } 2965 if (owned) 2966 dsl_dataset_disown(dspp.to_ds, dsflags, FTAG); 2967 else 2968 dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG); 2969 return (err); 2970 } 2971 2972 static int 2973 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed, 2974 uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep) 2975 { 2976 int err = 0; 2977 uint64_t size; 2978 /* 2979 * Assume that space (both on-disk and in-stream) is dominated by 2980 * data. We will adjust for indirect blocks and the copies property, 2981 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records). 2982 */ 2983 2984 uint64_t recordsize; 2985 uint64_t record_count; 2986 objset_t *os; 2987 VERIFY0(dmu_objset_from_ds(ds, &os)); 2988 2989 /* Assume all (uncompressed) blocks are recordsize. */ 2990 if (zfs_override_estimate_recordsize != 0) { 2991 recordsize = zfs_override_estimate_recordsize; 2992 } else if (os->os_phys->os_type == DMU_OST_ZVOL) { 2993 err = dsl_prop_get_int_ds(ds, 2994 zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize); 2995 } else { 2996 err = dsl_prop_get_int_ds(ds, 2997 zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize); 2998 } 2999 if (err != 0) 3000 return (err); 3001 record_count = uncompressed / recordsize; 3002 3003 /* 3004 * If we're estimating a send size for a compressed stream, use the 3005 * compressed data size to estimate the stream size. Otherwise, use the 3006 * uncompressed data size. 3007 */ 3008 size = stream_compressed ? compressed : uncompressed; 3009 3010 /* 3011 * Subtract out approximate space used by indirect blocks. 3012 * Assume most space is used by data blocks (non-indirect, non-dnode). 3013 * Assume no ditto blocks or internal fragmentation. 3014 * 3015 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per 3016 * block. 3017 */ 3018 size -= record_count * sizeof (blkptr_t); 3019 3020 /* Add in the space for the record associated with each block. */ 3021 size += record_count * sizeof (dmu_replay_record_t); 3022 3023 *sizep = size; 3024 3025 return (0); 3026 } 3027 3028 int 3029 dmu_send_estimate_fast(dsl_dataset_t *origds, dsl_dataset_t *fromds, 3030 zfs_bookmark_phys_t *frombook, boolean_t stream_compressed, 3031 boolean_t saved, uint64_t *sizep) 3032 { 3033 int err; 3034 dsl_dataset_t *ds = origds; 3035 uint64_t uncomp, comp; 3036 3037 ASSERT(dsl_pool_config_held(origds->ds_dir->dd_pool)); 3038 ASSERT(fromds == NULL || frombook == NULL); 3039 3040 /* 3041 * If this is a saved send we may actually be sending 3042 * from the %recv clone used for resuming. 3043 */ 3044 if (saved) { 3045 objset_t *mos = origds->ds_dir->dd_pool->dp_meta_objset; 3046 uint64_t guid; 3047 char dsname[ZFS_MAX_DATASET_NAME_LEN + 6]; 3048 3049 dsl_dataset_name(origds, dsname); 3050 (void) strcat(dsname, "/"); 3051 (void) strlcat(dsname, recv_clone_name, sizeof (dsname)); 3052 3053 err = dsl_dataset_hold(origds->ds_dir->dd_pool, 3054 dsname, FTAG, &ds); 3055 if (err != ENOENT && err != 0) { 3056 return (err); 3057 } else if (err == ENOENT) { 3058 ds = origds; 3059 } 3060 3061 /* check that this dataset has partially received data */ 3062 err = zap_lookup(mos, ds->ds_object, 3063 DS_FIELD_RESUME_TOGUID, 8, 1, &guid); 3064 if (err != 0) { 3065 err = SET_ERROR(err == ENOENT ? EINVAL : err); 3066 goto out; 3067 } 3068 3069 err = zap_lookup(mos, ds->ds_object, 3070 DS_FIELD_RESUME_TONAME, 1, sizeof (dsname), dsname); 3071 if (err != 0) { 3072 err = SET_ERROR(err == ENOENT ? EINVAL : err); 3073 goto out; 3074 } 3075 } 3076 3077 /* tosnap must be a snapshot or the target of a saved send */ 3078 if (!ds->ds_is_snapshot && ds == origds) 3079 return (SET_ERROR(EINVAL)); 3080 3081 if (fromds != NULL) { 3082 uint64_t used; 3083 if (!fromds->ds_is_snapshot) { 3084 err = SET_ERROR(EINVAL); 3085 goto out; 3086 } 3087 3088 if (!dsl_dataset_is_before(ds, fromds, 0)) { 3089 err = SET_ERROR(EXDEV); 3090 goto out; 3091 } 3092 3093 err = dsl_dataset_space_written(fromds, ds, &used, &comp, 3094 &uncomp); 3095 if (err != 0) 3096 goto out; 3097 } else if (frombook != NULL) { 3098 uint64_t used; 3099 err = dsl_dataset_space_written_bookmark(frombook, ds, &used, 3100 &comp, &uncomp); 3101 if (err != 0) 3102 goto out; 3103 } else { 3104 uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes; 3105 comp = dsl_dataset_phys(ds)->ds_compressed_bytes; 3106 } 3107 3108 err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp, 3109 stream_compressed, sizep); 3110 /* 3111 * Add the size of the BEGIN and END records to the estimate. 3112 */ 3113 *sizep += 2 * sizeof (dmu_replay_record_t); 3114 3115 out: 3116 if (ds != origds) 3117 dsl_dataset_rele(ds, FTAG); 3118 return (err); 3119 } 3120 3121 ZFS_MODULE_PARAM(zfs_send, zfs_send_, corrupt_data, INT, ZMOD_RW, 3122 "Allow sending corrupt data"); 3123 3124 ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_length, UINT, ZMOD_RW, 3125 "Maximum send queue length"); 3126 3127 ZFS_MODULE_PARAM(zfs_send, zfs_send_, unmodified_spill_blocks, INT, ZMOD_RW, 3128 "Send unmodified spill blocks"); 3129 3130 ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_length, UINT, ZMOD_RW, 3131 "Maximum send queue length for non-prefetch queues"); 3132 3133 ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_ff, UINT, ZMOD_RW, 3134 "Send queue fill fraction"); 3135 3136 ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_ff, UINT, ZMOD_RW, 3137 "Send queue fill fraction for non-prefetch queues"); 3138 3139 ZFS_MODULE_PARAM(zfs_send, zfs_, override_estimate_recordsize, UINT, ZMOD_RW, 3140 "Override block size estimate with fixed size"); 3141