xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu_recv.c (revision 32e09e17e4529edf39ffb44fb13cdb6a0fb45733)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26  * Copyright 2014 HybridCluster. All rights reserved.
27  * Copyright 2016 RackTop Systems.
28  * Copyright (c) 2014 Integros [integros.com]
29  */
30 
31 #include <sys/dmu.h>
32 #include <sys/dmu_impl.h>
33 #include <sys/dmu_tx.h>
34 #include <sys/dbuf.h>
35 #include <sys/dnode.h>
36 #include <sys/zfs_context.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/dmu_traverse.h>
39 #include <sys/dsl_dataset.h>
40 #include <sys/dsl_dir.h>
41 #include <sys/dsl_prop.h>
42 #include <sys/dsl_pool.h>
43 #include <sys/dsl_synctask.h>
44 #include <sys/zfs_ioctl.h>
45 #include <sys/zap.h>
46 #include <sys/zio_checksum.h>
47 #include <sys/zfs_znode.h>
48 #include <zfs_fletcher.h>
49 #include <sys/avl.h>
50 #include <sys/ddt.h>
51 #include <sys/zfs_onexit.h>
52 #include <sys/dmu_recv.h>
53 #include <sys/dsl_destroy.h>
54 #include <sys/blkptr.h>
55 #include <sys/dsl_bookmark.h>
56 #include <sys/zfeature.h>
57 #include <sys/bqueue.h>
58 
59 int zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
60 
61 static char *dmu_recv_tag = "dmu_recv_tag";
62 const char *recv_clone_name = "%recv";
63 
64 static void byteswap_record(dmu_replay_record_t *drr);
65 
66 typedef struct dmu_recv_begin_arg {
67 	const char *drba_origin;
68 	dmu_recv_cookie_t *drba_cookie;
69 	cred_t *drba_cred;
70 	uint64_t drba_snapobj;
71 } dmu_recv_begin_arg_t;
72 
73 static int
74 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
75     uint64_t fromguid)
76 {
77 	uint64_t val;
78 	int error;
79 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
80 
81 	/* temporary clone name must not exist */
82 	error = zap_lookup(dp->dp_meta_objset,
83 	    dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
84 	    8, 1, &val);
85 	if (error != ENOENT)
86 		return (error == 0 ? EBUSY : error);
87 
88 	/* new snapshot name must not exist */
89 	error = zap_lookup(dp->dp_meta_objset,
90 	    dsl_dataset_phys(ds)->ds_snapnames_zapobj,
91 	    drba->drba_cookie->drc_tosnap, 8, 1, &val);
92 	if (error != ENOENT)
93 		return (error == 0 ? EEXIST : error);
94 
95 	/*
96 	 * Check snapshot limit before receiving. We'll recheck again at the
97 	 * end, but might as well abort before receiving if we're already over
98 	 * the limit.
99 	 *
100 	 * Note that we do not check the file system limit with
101 	 * dsl_dir_fscount_check because the temporary %clones don't count
102 	 * against that limit.
103 	 */
104 	error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
105 	    NULL, drba->drba_cred);
106 	if (error != 0)
107 		return (error);
108 
109 	if (fromguid != 0) {
110 		dsl_dataset_t *snap;
111 		uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
112 
113 		/* Find snapshot in this dir that matches fromguid. */
114 		while (obj != 0) {
115 			error = dsl_dataset_hold_obj(dp, obj, FTAG,
116 			    &snap);
117 			if (error != 0)
118 				return (SET_ERROR(ENODEV));
119 			if (snap->ds_dir != ds->ds_dir) {
120 				dsl_dataset_rele(snap, FTAG);
121 				return (SET_ERROR(ENODEV));
122 			}
123 			if (dsl_dataset_phys(snap)->ds_guid == fromguid)
124 				break;
125 			obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
126 			dsl_dataset_rele(snap, FTAG);
127 		}
128 		if (obj == 0)
129 			return (SET_ERROR(ENODEV));
130 
131 		if (drba->drba_cookie->drc_force) {
132 			drba->drba_snapobj = obj;
133 		} else {
134 			/*
135 			 * If we are not forcing, there must be no
136 			 * changes since fromsnap.
137 			 */
138 			if (dsl_dataset_modified_since_snap(ds, snap)) {
139 				dsl_dataset_rele(snap, FTAG);
140 				return (SET_ERROR(ETXTBSY));
141 			}
142 			drba->drba_snapobj = ds->ds_prev->ds_object;
143 		}
144 
145 		dsl_dataset_rele(snap, FTAG);
146 	} else {
147 		/* if full, then must be forced */
148 		if (!drba->drba_cookie->drc_force)
149 			return (SET_ERROR(EEXIST));
150 		/* start from $ORIGIN@$ORIGIN, if supported */
151 		drba->drba_snapobj = dp->dp_origin_snap != NULL ?
152 		    dp->dp_origin_snap->ds_object : 0;
153 	}
154 
155 	return (0);
156 
157 }
158 
159 static int
160 dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
161 {
162 	dmu_recv_begin_arg_t *drba = arg;
163 	dsl_pool_t *dp = dmu_tx_pool(tx);
164 	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
165 	uint64_t fromguid = drrb->drr_fromguid;
166 	int flags = drrb->drr_flags;
167 	int error;
168 	uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
169 	dsl_dataset_t *ds;
170 	const char *tofs = drba->drba_cookie->drc_tofs;
171 
172 	/* already checked */
173 	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
174 	ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
175 
176 	if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
177 	    DMU_COMPOUNDSTREAM ||
178 	    drrb->drr_type >= DMU_OST_NUMTYPES ||
179 	    ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
180 		return (SET_ERROR(EINVAL));
181 
182 	/* Verify pool version supports SA if SA_SPILL feature set */
183 	if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
184 	    spa_version(dp->dp_spa) < SPA_VERSION_SA)
185 		return (SET_ERROR(ENOTSUP));
186 
187 	if (drba->drba_cookie->drc_resumable &&
188 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
189 		return (SET_ERROR(ENOTSUP));
190 
191 	/*
192 	 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
193 	 * record to a plain WRITE record, so the pool must have the
194 	 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
195 	 * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
196 	 */
197 	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
198 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
199 		return (SET_ERROR(ENOTSUP));
200 	if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
201 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
202 		return (SET_ERROR(ENOTSUP));
203 
204 	/*
205 	 * The receiving code doesn't know how to translate large blocks
206 	 * to smaller ones, so the pool must have the LARGE_BLOCKS
207 	 * feature enabled if the stream has LARGE_BLOCKS. Same with
208 	 * large dnodes.
209 	 */
210 	if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
211 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
212 		return (SET_ERROR(ENOTSUP));
213 	if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
214 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
215 		return (SET_ERROR(ENOTSUP));
216 
217 	error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
218 	if (error == 0) {
219 		/* target fs already exists; recv into temp clone */
220 
221 		/* Can't recv a clone into an existing fs */
222 		if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
223 			dsl_dataset_rele(ds, FTAG);
224 			return (SET_ERROR(EINVAL));
225 		}
226 
227 		error = recv_begin_check_existing_impl(drba, ds, fromguid);
228 		dsl_dataset_rele(ds, FTAG);
229 	} else if (error == ENOENT) {
230 		/* target fs does not exist; must be a full backup or clone */
231 		char buf[ZFS_MAX_DATASET_NAME_LEN];
232 
233 		/*
234 		 * If it's a non-clone incremental, we are missing the
235 		 * target fs, so fail the recv.
236 		 */
237 		if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
238 		    drba->drba_origin))
239 			return (SET_ERROR(ENOENT));
240 
241 		/*
242 		 * If we're receiving a full send as a clone, and it doesn't
243 		 * contain all the necessary free records and freeobject
244 		 * records, reject it.
245 		 */
246 		if (fromguid == 0 && drba->drba_origin &&
247 		    !(flags & DRR_FLAG_FREERECORDS))
248 			return (SET_ERROR(EINVAL));
249 
250 		/* Open the parent of tofs */
251 		ASSERT3U(strlen(tofs), <, sizeof (buf));
252 		(void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
253 		error = dsl_dataset_hold(dp, buf, FTAG, &ds);
254 		if (error != 0)
255 			return (error);
256 
257 		/*
258 		 * Check filesystem and snapshot limits before receiving. We'll
259 		 * recheck snapshot limits again at the end (we create the
260 		 * filesystems and increment those counts during begin_sync).
261 		 */
262 		error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
263 		    ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
264 		if (error != 0) {
265 			dsl_dataset_rele(ds, FTAG);
266 			return (error);
267 		}
268 
269 		error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
270 		    ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
271 		if (error != 0) {
272 			dsl_dataset_rele(ds, FTAG);
273 			return (error);
274 		}
275 
276 		if (drba->drba_origin != NULL) {
277 			dsl_dataset_t *origin;
278 			error = dsl_dataset_hold(dp, drba->drba_origin,
279 			    FTAG, &origin);
280 			if (error != 0) {
281 				dsl_dataset_rele(ds, FTAG);
282 				return (error);
283 			}
284 			if (!origin->ds_is_snapshot) {
285 				dsl_dataset_rele(origin, FTAG);
286 				dsl_dataset_rele(ds, FTAG);
287 				return (SET_ERROR(EINVAL));
288 			}
289 			if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
290 			    fromguid != 0) {
291 				dsl_dataset_rele(origin, FTAG);
292 				dsl_dataset_rele(ds, FTAG);
293 				return (SET_ERROR(ENODEV));
294 			}
295 			dsl_dataset_rele(origin, FTAG);
296 		}
297 		dsl_dataset_rele(ds, FTAG);
298 		error = 0;
299 	}
300 	return (error);
301 }
302 
303 static void
304 dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
305 {
306 	dmu_recv_begin_arg_t *drba = arg;
307 	dsl_pool_t *dp = dmu_tx_pool(tx);
308 	objset_t *mos = dp->dp_meta_objset;
309 	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
310 	const char *tofs = drba->drba_cookie->drc_tofs;
311 	dsl_dataset_t *ds, *newds;
312 	uint64_t dsobj;
313 	int error;
314 	uint64_t crflags = 0;
315 
316 	if (drrb->drr_flags & DRR_FLAG_CI_DATA)
317 		crflags |= DS_FLAG_CI_DATASET;
318 
319 	error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
320 	if (error == 0) {
321 		/* create temporary clone */
322 		dsl_dataset_t *snap = NULL;
323 		if (drba->drba_snapobj != 0) {
324 			VERIFY0(dsl_dataset_hold_obj(dp,
325 			    drba->drba_snapobj, FTAG, &snap));
326 		}
327 		dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
328 		    snap, crflags, drba->drba_cred, tx);
329 		if (drba->drba_snapobj != 0)
330 			dsl_dataset_rele(snap, FTAG);
331 		dsl_dataset_rele(ds, FTAG);
332 	} else {
333 		dsl_dir_t *dd;
334 		const char *tail;
335 		dsl_dataset_t *origin = NULL;
336 
337 		VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
338 
339 		if (drba->drba_origin != NULL) {
340 			VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
341 			    FTAG, &origin));
342 		}
343 
344 		/* Create new dataset. */
345 		dsobj = dsl_dataset_create_sync(dd,
346 		    strrchr(tofs, '/') + 1,
347 		    origin, crflags, drba->drba_cred, tx);
348 		if (origin != NULL)
349 			dsl_dataset_rele(origin, FTAG);
350 		dsl_dir_rele(dd, FTAG);
351 		drba->drba_cookie->drc_newfs = B_TRUE;
352 	}
353 	VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds));
354 
355 	if (drba->drba_cookie->drc_resumable) {
356 		dsl_dataset_zapify(newds, tx);
357 		if (drrb->drr_fromguid != 0) {
358 			VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
359 			    8, 1, &drrb->drr_fromguid, tx));
360 		}
361 		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
362 		    8, 1, &drrb->drr_toguid, tx));
363 		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
364 		    1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
365 		uint64_t one = 1;
366 		uint64_t zero = 0;
367 		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
368 		    8, 1, &one, tx));
369 		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
370 		    8, 1, &zero, tx));
371 		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
372 		    8, 1, &zero, tx));
373 		if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
374 		    DMU_BACKUP_FEATURE_LARGE_BLOCKS) {
375 			VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_LARGEBLOCK,
376 			    8, 1, &one, tx));
377 		}
378 		if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
379 		    DMU_BACKUP_FEATURE_EMBED_DATA) {
380 			VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
381 			    8, 1, &one, tx));
382 		}
383 		if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
384 		    DMU_BACKUP_FEATURE_COMPRESSED) {
385 			VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_COMPRESSOK,
386 			    8, 1, &one, tx));
387 		}
388 	}
389 
390 	dmu_buf_will_dirty(newds->ds_dbuf, tx);
391 	dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
392 
393 	/*
394 	 * If we actually created a non-clone, we need to create the
395 	 * objset in our new dataset.
396 	 */
397 	rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG);
398 	if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) {
399 		(void) dmu_objset_create_impl(dp->dp_spa,
400 		    newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
401 	}
402 	rrw_exit(&newds->ds_bp_rwlock, FTAG);
403 
404 	drba->drba_cookie->drc_ds = newds;
405 
406 	spa_history_log_internal_ds(newds, "receive", tx, "");
407 }
408 
409 static int
410 dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
411 {
412 	dmu_recv_begin_arg_t *drba = arg;
413 	dsl_pool_t *dp = dmu_tx_pool(tx);
414 	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
415 	int error;
416 	uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
417 	dsl_dataset_t *ds;
418 	const char *tofs = drba->drba_cookie->drc_tofs;
419 
420 	/* 6 extra bytes for /%recv */
421 	char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
422 
423 	/* already checked */
424 	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
425 	ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
426 
427 	if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
428 	    DMU_COMPOUNDSTREAM ||
429 	    drrb->drr_type >= DMU_OST_NUMTYPES)
430 		return (SET_ERROR(EINVAL));
431 
432 	/* Verify pool version supports SA if SA_SPILL feature set */
433 	if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
434 	    spa_version(dp->dp_spa) < SPA_VERSION_SA)
435 		return (SET_ERROR(ENOTSUP));
436 
437 	/*
438 	 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
439 	 * record to a plain WRITE record, so the pool must have the
440 	 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
441 	 * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
442 	 */
443 	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
444 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
445 		return (SET_ERROR(ENOTSUP));
446 	if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
447 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
448 		return (SET_ERROR(ENOTSUP));
449 
450 	/*
451 	 * The receiving code doesn't know how to translate large blocks
452 	 * to smaller ones, so the pool must have the LARGE_BLOCKS
453 	 * feature enabled if the stream has LARGE_BLOCKS. Same with
454 	 * large dnodes.
455 	 */
456 	if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
457 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
458 		return (SET_ERROR(ENOTSUP));
459 	if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
460 	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
461 		return (SET_ERROR(ENOTSUP));
462 
463 	(void) snprintf(recvname, sizeof (recvname), "%s/%s",
464 	    tofs, recv_clone_name);
465 
466 	if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
467 		/* %recv does not exist; continue in tofs */
468 		error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
469 		if (error != 0)
470 			return (error);
471 	}
472 
473 	/* check that ds is marked inconsistent */
474 	if (!DS_IS_INCONSISTENT(ds)) {
475 		dsl_dataset_rele(ds, FTAG);
476 		return (SET_ERROR(EINVAL));
477 	}
478 
479 	/* check that there is resuming data, and that the toguid matches */
480 	if (!dsl_dataset_is_zapified(ds)) {
481 		dsl_dataset_rele(ds, FTAG);
482 		return (SET_ERROR(EINVAL));
483 	}
484 	uint64_t val;
485 	error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
486 	    DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
487 	if (error != 0 || drrb->drr_toguid != val) {
488 		dsl_dataset_rele(ds, FTAG);
489 		return (SET_ERROR(EINVAL));
490 	}
491 
492 	/*
493 	 * Check if the receive is still running.  If so, it will be owned.
494 	 * Note that nothing else can own the dataset (e.g. after the receive
495 	 * fails) because it will be marked inconsistent.
496 	 */
497 	if (dsl_dataset_has_owner(ds)) {
498 		dsl_dataset_rele(ds, FTAG);
499 		return (SET_ERROR(EBUSY));
500 	}
501 
502 	/* There should not be any snapshots of this fs yet. */
503 	if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
504 		dsl_dataset_rele(ds, FTAG);
505 		return (SET_ERROR(EINVAL));
506 	}
507 
508 	/*
509 	 * Note: resume point will be checked when we process the first WRITE
510 	 * record.
511 	 */
512 
513 	/* check that the origin matches */
514 	val = 0;
515 	(void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
516 	    DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
517 	if (drrb->drr_fromguid != val) {
518 		dsl_dataset_rele(ds, FTAG);
519 		return (SET_ERROR(EINVAL));
520 	}
521 
522 	dsl_dataset_rele(ds, FTAG);
523 	return (0);
524 }
525 
526 static void
527 dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
528 {
529 	dmu_recv_begin_arg_t *drba = arg;
530 	dsl_pool_t *dp = dmu_tx_pool(tx);
531 	const char *tofs = drba->drba_cookie->drc_tofs;
532 	dsl_dataset_t *ds;
533 	uint64_t dsobj;
534 	/* 6 extra bytes for /%recv */
535 	char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
536 
537 	(void) snprintf(recvname, sizeof (recvname), "%s/%s",
538 	    tofs, recv_clone_name);
539 
540 	if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
541 		/* %recv does not exist; continue in tofs */
542 		VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds));
543 		drba->drba_cookie->drc_newfs = B_TRUE;
544 	}
545 
546 	/* clear the inconsistent flag so that we can own it */
547 	ASSERT(DS_IS_INCONSISTENT(ds));
548 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
549 	dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
550 	dsobj = ds->ds_object;
551 	dsl_dataset_rele(ds, FTAG);
552 
553 	VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds));
554 
555 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
556 	dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
557 
558 	rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
559 	ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)));
560 	rrw_exit(&ds->ds_bp_rwlock, FTAG);
561 
562 	drba->drba_cookie->drc_ds = ds;
563 
564 	spa_history_log_internal_ds(ds, "resume receive", tx, "");
565 }
566 
567 /*
568  * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
569  * succeeds; otherwise we will leak the holds on the datasets.
570  */
571 int
572 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
573     boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc)
574 {
575 	dmu_recv_begin_arg_t drba = { 0 };
576 
577 	bzero(drc, sizeof (dmu_recv_cookie_t));
578 	drc->drc_drr_begin = drr_begin;
579 	drc->drc_drrb = &drr_begin->drr_u.drr_begin;
580 	drc->drc_tosnap = tosnap;
581 	drc->drc_tofs = tofs;
582 	drc->drc_force = force;
583 	drc->drc_resumable = resumable;
584 	drc->drc_cred = CRED();
585 	drc->drc_clone = (origin != NULL);
586 
587 	if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
588 		drc->drc_byteswap = B_TRUE;
589 		(void) fletcher_4_incremental_byteswap(drr_begin,
590 		    sizeof (dmu_replay_record_t), &drc->drc_cksum);
591 		byteswap_record(drr_begin);
592 	} else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
593 		(void) fletcher_4_incremental_native(drr_begin,
594 		    sizeof (dmu_replay_record_t), &drc->drc_cksum);
595 	} else {
596 		return (SET_ERROR(EINVAL));
597 	}
598 
599 	drba.drba_origin = origin;
600 	drba.drba_cookie = drc;
601 	drba.drba_cred = CRED();
602 
603 	if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
604 	    DMU_BACKUP_FEATURE_RESUMING) {
605 		return (dsl_sync_task(tofs,
606 		    dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
607 		    &drba, 5, ZFS_SPACE_CHECK_NORMAL));
608 	} else  {
609 		return (dsl_sync_task(tofs,
610 		    dmu_recv_begin_check, dmu_recv_begin_sync,
611 		    &drba, 5, ZFS_SPACE_CHECK_NORMAL));
612 	}
613 }
614 
615 struct receive_record_arg {
616 	dmu_replay_record_t header;
617 	void *payload; /* Pointer to a buffer containing the payload */
618 	/*
619 	 * If the record is a write, pointer to the arc_buf_t containing the
620 	 * payload.
621 	 */
622 	arc_buf_t *write_buf;
623 	int payload_size;
624 	uint64_t bytes_read; /* bytes read from stream when record created */
625 	boolean_t eos_marker; /* Marks the end of the stream */
626 	bqueue_node_t node;
627 };
628 
629 struct receive_writer_arg {
630 	objset_t *os;
631 	boolean_t byteswap;
632 	bqueue_t q;
633 
634 	/*
635 	 * These three args are used to signal to the main thread that we're
636 	 * done.
637 	 */
638 	kmutex_t mutex;
639 	kcondvar_t cv;
640 	boolean_t done;
641 
642 	int err;
643 	/* A map from guid to dataset to help handle dedup'd streams. */
644 	avl_tree_t *guid_to_ds_map;
645 	boolean_t resumable;
646 	uint64_t last_object;
647 	uint64_t last_offset;
648 	uint64_t max_object; /* highest object ID referenced in stream */
649 	uint64_t bytes_read; /* bytes read when current record created */
650 };
651 
652 struct objlist {
653 	list_t list; /* List of struct receive_objnode. */
654 	/*
655 	 * Last object looked up. Used to assert that objects are being looked
656 	 * up in ascending order.
657 	 */
658 	uint64_t last_lookup;
659 };
660 
661 struct receive_objnode {
662 	list_node_t node;
663 	uint64_t object;
664 };
665 
666 struct receive_arg {
667 	objset_t *os;
668 	vnode_t *vp; /* The vnode to read the stream from */
669 	uint64_t voff; /* The current offset in the stream */
670 	uint64_t bytes_read;
671 	/*
672 	 * A record that has had its payload read in, but hasn't yet been handed
673 	 * off to the worker thread.
674 	 */
675 	struct receive_record_arg *rrd;
676 	/* A record that has had its header read in, but not its payload. */
677 	struct receive_record_arg *next_rrd;
678 	zio_cksum_t cksum;
679 	zio_cksum_t prev_cksum;
680 	int err;
681 	boolean_t byteswap;
682 	/* Sorted list of objects not to issue prefetches for. */
683 	struct objlist ignore_objlist;
684 };
685 
686 typedef struct guid_map_entry {
687 	uint64_t	guid;
688 	dsl_dataset_t	*gme_ds;
689 	avl_node_t	avlnode;
690 } guid_map_entry_t;
691 
692 static int
693 guid_compare(const void *arg1, const void *arg2)
694 {
695 	const guid_map_entry_t *gmep1 = arg1;
696 	const guid_map_entry_t *gmep2 = arg2;
697 
698 	if (gmep1->guid < gmep2->guid)
699 		return (-1);
700 	else if (gmep1->guid > gmep2->guid)
701 		return (1);
702 	return (0);
703 }
704 
705 static void
706 free_guid_map_onexit(void *arg)
707 {
708 	avl_tree_t *ca = arg;
709 	void *cookie = NULL;
710 	guid_map_entry_t *gmep;
711 
712 	while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
713 		dsl_dataset_long_rele(gmep->gme_ds, gmep);
714 		dsl_dataset_rele(gmep->gme_ds, gmep);
715 		kmem_free(gmep, sizeof (guid_map_entry_t));
716 	}
717 	avl_destroy(ca);
718 	kmem_free(ca, sizeof (avl_tree_t));
719 }
720 
721 static int
722 receive_read(struct receive_arg *ra, int len, void *buf)
723 {
724 	int done = 0;
725 
726 	/*
727 	 * The code doesn't rely on this (lengths being multiples of 8).  See
728 	 * comment in dump_bytes.
729 	 */
730 	ASSERT0(len % 8);
731 
732 	while (done < len) {
733 		ssize_t resid;
734 
735 		ra->err = vn_rdwr(UIO_READ, ra->vp,
736 		    (char *)buf + done, len - done,
737 		    ra->voff, UIO_SYSSPACE, FAPPEND,
738 		    RLIM64_INFINITY, CRED(), &resid);
739 
740 		if (resid == len - done) {
741 			/*
742 			 * Note: ECKSUM indicates that the receive
743 			 * was interrupted and can potentially be resumed.
744 			 */
745 			ra->err = SET_ERROR(ECKSUM);
746 		}
747 		ra->voff += len - done - resid;
748 		done = len - resid;
749 		if (ra->err != 0)
750 			return (ra->err);
751 	}
752 
753 	ra->bytes_read += len;
754 
755 	ASSERT3U(done, ==, len);
756 	return (0);
757 }
758 
759 static void
760 byteswap_record(dmu_replay_record_t *drr)
761 {
762 #define	DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
763 #define	DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
764 	drr->drr_type = BSWAP_32(drr->drr_type);
765 	drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
766 
767 	switch (drr->drr_type) {
768 	case DRR_BEGIN:
769 		DO64(drr_begin.drr_magic);
770 		DO64(drr_begin.drr_versioninfo);
771 		DO64(drr_begin.drr_creation_time);
772 		DO32(drr_begin.drr_type);
773 		DO32(drr_begin.drr_flags);
774 		DO64(drr_begin.drr_toguid);
775 		DO64(drr_begin.drr_fromguid);
776 		break;
777 	case DRR_OBJECT:
778 		DO64(drr_object.drr_object);
779 		DO32(drr_object.drr_type);
780 		DO32(drr_object.drr_bonustype);
781 		DO32(drr_object.drr_blksz);
782 		DO32(drr_object.drr_bonuslen);
783 		DO64(drr_object.drr_toguid);
784 		break;
785 	case DRR_FREEOBJECTS:
786 		DO64(drr_freeobjects.drr_firstobj);
787 		DO64(drr_freeobjects.drr_numobjs);
788 		DO64(drr_freeobjects.drr_toguid);
789 		break;
790 	case DRR_WRITE:
791 		DO64(drr_write.drr_object);
792 		DO32(drr_write.drr_type);
793 		DO64(drr_write.drr_offset);
794 		DO64(drr_write.drr_logical_size);
795 		DO64(drr_write.drr_toguid);
796 		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
797 		DO64(drr_write.drr_key.ddk_prop);
798 		DO64(drr_write.drr_compressed_size);
799 		break;
800 	case DRR_WRITE_BYREF:
801 		DO64(drr_write_byref.drr_object);
802 		DO64(drr_write_byref.drr_offset);
803 		DO64(drr_write_byref.drr_length);
804 		DO64(drr_write_byref.drr_toguid);
805 		DO64(drr_write_byref.drr_refguid);
806 		DO64(drr_write_byref.drr_refobject);
807 		DO64(drr_write_byref.drr_refoffset);
808 		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
809 		    drr_key.ddk_cksum);
810 		DO64(drr_write_byref.drr_key.ddk_prop);
811 		break;
812 	case DRR_WRITE_EMBEDDED:
813 		DO64(drr_write_embedded.drr_object);
814 		DO64(drr_write_embedded.drr_offset);
815 		DO64(drr_write_embedded.drr_length);
816 		DO64(drr_write_embedded.drr_toguid);
817 		DO32(drr_write_embedded.drr_lsize);
818 		DO32(drr_write_embedded.drr_psize);
819 		break;
820 	case DRR_FREE:
821 		DO64(drr_free.drr_object);
822 		DO64(drr_free.drr_offset);
823 		DO64(drr_free.drr_length);
824 		DO64(drr_free.drr_toguid);
825 		break;
826 	case DRR_SPILL:
827 		DO64(drr_spill.drr_object);
828 		DO64(drr_spill.drr_length);
829 		DO64(drr_spill.drr_toguid);
830 		break;
831 	case DRR_END:
832 		DO64(drr_end.drr_toguid);
833 		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
834 		break;
835 	}
836 
837 	if (drr->drr_type != DRR_BEGIN) {
838 		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
839 	}
840 
841 #undef DO64
842 #undef DO32
843 }
844 
845 static inline uint8_t
846 deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
847 {
848 	if (bonus_type == DMU_OT_SA) {
849 		return (1);
850 	} else {
851 		return (1 +
852 		    ((DN_OLD_MAX_BONUSLEN -
853 		    MIN(DN_OLD_MAX_BONUSLEN, bonus_size)) >> SPA_BLKPTRSHIFT));
854 	}
855 }
856 
857 static void
858 save_resume_state(struct receive_writer_arg *rwa,
859     uint64_t object, uint64_t offset, dmu_tx_t *tx)
860 {
861 	int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
862 
863 	if (!rwa->resumable)
864 		return;
865 
866 	/*
867 	 * We use ds_resume_bytes[] != 0 to indicate that we need to
868 	 * update this on disk, so it must not be 0.
869 	 */
870 	ASSERT(rwa->bytes_read != 0);
871 
872 	/*
873 	 * We only resume from write records, which have a valid
874 	 * (non-meta-dnode) object number.
875 	 */
876 	ASSERT(object != 0);
877 
878 	/*
879 	 * For resuming to work correctly, we must receive records in order,
880 	 * sorted by object,offset.  This is checked by the callers, but
881 	 * assert it here for good measure.
882 	 */
883 	ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
884 	ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
885 	    offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
886 	ASSERT3U(rwa->bytes_read, >=,
887 	    rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
888 
889 	rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
890 	rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
891 	rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
892 }
893 
894 static int
895 receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
896     void *data)
897 {
898 	dmu_object_info_t doi;
899 	dmu_tx_t *tx;
900 	uint64_t object;
901 	int err;
902 	uint8_t dn_slots = drro->drr_dn_slots != 0 ?
903 	    drro->drr_dn_slots : DNODE_MIN_SLOTS;
904 
905 	if (drro->drr_type == DMU_OT_NONE ||
906 	    !DMU_OT_IS_VALID(drro->drr_type) ||
907 	    !DMU_OT_IS_VALID(drro->drr_bonustype) ||
908 	    drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
909 	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
910 	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
911 	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
912 	    drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
913 	    drro->drr_bonuslen >
914 	    DN_BONUS_SIZE(spa_maxdnodesize(dmu_objset_spa(rwa->os))) ||
915 	    dn_slots >
916 	    (spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT)) {
917 		return (SET_ERROR(EINVAL));
918 	}
919 
920 	err = dmu_object_info(rwa->os, drro->drr_object, &doi);
921 
922 	if (err != 0 && err != ENOENT && err != EEXIST)
923 		return (SET_ERROR(EINVAL));
924 
925 	if (drro->drr_object > rwa->max_object)
926 		rwa->max_object = drro->drr_object;
927 
928 	/*
929 	 * If we are losing blkptrs or changing the block size this must
930 	 * be a new file instance.  We must clear out the previous file
931 	 * contents before we can change this type of metadata in the dnode.
932 	 */
933 	if (err == 0) {
934 		int nblkptr;
935 
936 		object = drro->drr_object;
937 
938 		nblkptr = deduce_nblkptr(drro->drr_bonustype,
939 		    drro->drr_bonuslen);
940 
941 		if (drro->drr_blksz != doi.doi_data_block_size ||
942 		    nblkptr < doi.doi_nblkptr ||
943 		    dn_slots != doi.doi_dnodesize >> DNODE_SHIFT) {
944 			err = dmu_free_long_range(rwa->os, drro->drr_object,
945 			    0, DMU_OBJECT_END);
946 			if (err != 0)
947 				return (SET_ERROR(EINVAL));
948 		}
949 	} else if (err == EEXIST) {
950 		/*
951 		 * The object requested is currently an interior slot of a
952 		 * multi-slot dnode. This will be resolved when the next txg
953 		 * is synced out, since the send stream will have told us
954 		 * to free this slot when we freed the associated dnode
955 		 * earlier in the stream.
956 		 */
957 		txg_wait_synced(dmu_objset_pool(rwa->os), 0);
958 		object = drro->drr_object;
959 	} else {
960 		/* object is free and we are about to allocate a new one */
961 		object = DMU_NEW_OBJECT;
962 	}
963 
964 	/*
965 	 * If this is a multi-slot dnode there is a chance that this
966 	 * object will expand into a slot that is already used by
967 	 * another object from the previous snapshot. We must free
968 	 * these objects before we attempt to allocate the new dnode.
969 	 */
970 	if (dn_slots > 1) {
971 		boolean_t need_sync = B_FALSE;
972 
973 		for (uint64_t slot = drro->drr_object + 1;
974 		    slot < drro->drr_object + dn_slots;
975 		    slot++) {
976 			dmu_object_info_t slot_doi;
977 
978 			err = dmu_object_info(rwa->os, slot, &slot_doi);
979 			if (err == ENOENT || err == EEXIST)
980 				continue;
981 			else if (err != 0)
982 				return (err);
983 
984 			err = dmu_free_long_object(rwa->os, slot);
985 
986 			if (err != 0)
987 				return (err);
988 
989 			need_sync = B_TRUE;
990 		}
991 
992 		if (need_sync)
993 			txg_wait_synced(dmu_objset_pool(rwa->os), 0);
994 	}
995 
996 	tx = dmu_tx_create(rwa->os);
997 	dmu_tx_hold_bonus(tx, object);
998 	err = dmu_tx_assign(tx, TXG_WAIT);
999 	if (err != 0) {
1000 		dmu_tx_abort(tx);
1001 		return (err);
1002 	}
1003 
1004 	if (object == DMU_NEW_OBJECT) {
1005 		/* currently free, want to be allocated */
1006 		err = dmu_object_claim_dnsize(rwa->os, drro->drr_object,
1007 		    drro->drr_type, drro->drr_blksz,
1008 		    drro->drr_bonustype, drro->drr_bonuslen,
1009 		    dn_slots << DNODE_SHIFT, tx);
1010 	} else if (drro->drr_type != doi.doi_type ||
1011 	    drro->drr_blksz != doi.doi_data_block_size ||
1012 	    drro->drr_bonustype != doi.doi_bonus_type ||
1013 	    drro->drr_bonuslen != doi.doi_bonus_size ||
1014 	    drro->drr_dn_slots != (doi.doi_dnodesize >> DNODE_SHIFT)) {
1015 		/* currently allocated, but with different properties */
1016 		err = dmu_object_reclaim_dnsize(rwa->os, drro->drr_object,
1017 		    drro->drr_type, drro->drr_blksz,
1018 		    drro->drr_bonustype, drro->drr_bonuslen,
1019 		    drro->drr_dn_slots << DNODE_SHIFT, tx);
1020 	}
1021 	if (err != 0) {
1022 		dmu_tx_commit(tx);
1023 		return (SET_ERROR(EINVAL));
1024 	}
1025 
1026 	dmu_object_set_checksum(rwa->os, drro->drr_object,
1027 	    drro->drr_checksumtype, tx);
1028 	dmu_object_set_compress(rwa->os, drro->drr_object,
1029 	    drro->drr_compress, tx);
1030 
1031 	if (data != NULL) {
1032 		dmu_buf_t *db;
1033 
1034 		VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db));
1035 		dmu_buf_will_dirty(db, tx);
1036 
1037 		ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
1038 		bcopy(data, db->db_data, drro->drr_bonuslen);
1039 		if (rwa->byteswap) {
1040 			dmu_object_byteswap_t byteswap =
1041 			    DMU_OT_BYTESWAP(drro->drr_bonustype);
1042 			dmu_ot_byteswap[byteswap].ob_func(db->db_data,
1043 			    drro->drr_bonuslen);
1044 		}
1045 		dmu_buf_rele(db, FTAG);
1046 	}
1047 	dmu_tx_commit(tx);
1048 
1049 	return (0);
1050 }
1051 
1052 /* ARGSUSED */
1053 static int
1054 receive_freeobjects(struct receive_writer_arg *rwa,
1055     struct drr_freeobjects *drrfo)
1056 {
1057 	uint64_t obj;
1058 	int next_err = 0;
1059 
1060 	if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
1061 		return (SET_ERROR(EINVAL));
1062 
1063 	for (obj = drrfo->drr_firstobj == 0 ? 1 : drrfo->drr_firstobj;
1064 	    obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
1065 	    next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
1066 		int err;
1067 
1068 		err = dmu_object_info(rwa->os, obj, NULL);
1069 		if (err == ENOENT)
1070 			continue;
1071 		else if (err != 0)
1072 			return (err);
1073 
1074 		err = dmu_free_long_object(rwa->os, obj);
1075 		if (err != 0)
1076 			return (err);
1077 
1078 		if (obj > rwa->max_object)
1079 			rwa->max_object = obj;
1080 	}
1081 	if (next_err != ESRCH)
1082 		return (next_err);
1083 	return (0);
1084 }
1085 
1086 static int
1087 receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
1088     arc_buf_t *abuf)
1089 {
1090 	dmu_tx_t *tx;
1091 	int err;
1092 
1093 	if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset ||
1094 	    !DMU_OT_IS_VALID(drrw->drr_type))
1095 		return (SET_ERROR(EINVAL));
1096 
1097 	/*
1098 	 * For resuming to work, records must be in increasing order
1099 	 * by (object, offset).
1100 	 */
1101 	if (drrw->drr_object < rwa->last_object ||
1102 	    (drrw->drr_object == rwa->last_object &&
1103 	    drrw->drr_offset < rwa->last_offset)) {
1104 		return (SET_ERROR(EINVAL));
1105 	}
1106 	rwa->last_object = drrw->drr_object;
1107 	rwa->last_offset = drrw->drr_offset;
1108 
1109 	if (rwa->last_object > rwa->max_object)
1110 		rwa->max_object = rwa->last_object;
1111 
1112 	if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
1113 		return (SET_ERROR(EINVAL));
1114 
1115 	tx = dmu_tx_create(rwa->os);
1116 
1117 	dmu_tx_hold_write(tx, drrw->drr_object,
1118 	    drrw->drr_offset, drrw->drr_logical_size);
1119 	err = dmu_tx_assign(tx, TXG_WAIT);
1120 	if (err != 0) {
1121 		dmu_tx_abort(tx);
1122 		return (err);
1123 	}
1124 	if (rwa->byteswap) {
1125 		dmu_object_byteswap_t byteswap =
1126 		    DMU_OT_BYTESWAP(drrw->drr_type);
1127 		dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
1128 		    DRR_WRITE_PAYLOAD_SIZE(drrw));
1129 	}
1130 
1131 	/* use the bonus buf to look up the dnode in dmu_assign_arcbuf */
1132 	dmu_buf_t *bonus;
1133 	if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
1134 		return (SET_ERROR(EINVAL));
1135 	dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
1136 
1137 	/*
1138 	 * Note: If the receive fails, we want the resume stream to start
1139 	 * with the same record that we last successfully received (as opposed
1140 	 * to the next record), so that we can verify that we are
1141 	 * resuming from the correct location.
1142 	 */
1143 	save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
1144 	dmu_tx_commit(tx);
1145 	dmu_buf_rele(bonus, FTAG);
1146 
1147 	return (0);
1148 }
1149 
1150 /*
1151  * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
1152  * streams to refer to a copy of the data that is already on the
1153  * system because it came in earlier in the stream.  This function
1154  * finds the earlier copy of the data, and uses that copy instead of
1155  * data from the stream to fulfill this write.
1156  */
1157 static int
1158 receive_write_byref(struct receive_writer_arg *rwa,
1159     struct drr_write_byref *drrwbr)
1160 {
1161 	dmu_tx_t *tx;
1162 	int err;
1163 	guid_map_entry_t gmesrch;
1164 	guid_map_entry_t *gmep;
1165 	avl_index_t where;
1166 	objset_t *ref_os = NULL;
1167 	dmu_buf_t *dbp;
1168 
1169 	if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
1170 		return (SET_ERROR(EINVAL));
1171 
1172 	/*
1173 	 * If the GUID of the referenced dataset is different from the
1174 	 * GUID of the target dataset, find the referenced dataset.
1175 	 */
1176 	if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
1177 		gmesrch.guid = drrwbr->drr_refguid;
1178 		if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
1179 		    &where)) == NULL) {
1180 			return (SET_ERROR(EINVAL));
1181 		}
1182 		if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
1183 			return (SET_ERROR(EINVAL));
1184 	} else {
1185 		ref_os = rwa->os;
1186 	}
1187 
1188 	if (drrwbr->drr_object > rwa->max_object)
1189 		rwa->max_object = drrwbr->drr_object;
1190 
1191 	err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
1192 	    drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH);
1193 	if (err != 0)
1194 		return (err);
1195 
1196 	tx = dmu_tx_create(rwa->os);
1197 
1198 	dmu_tx_hold_write(tx, drrwbr->drr_object,
1199 	    drrwbr->drr_offset, drrwbr->drr_length);
1200 	err = dmu_tx_assign(tx, TXG_WAIT);
1201 	if (err != 0) {
1202 		dmu_tx_abort(tx);
1203 		return (err);
1204 	}
1205 	dmu_write(rwa->os, drrwbr->drr_object,
1206 	    drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
1207 	dmu_buf_rele(dbp, FTAG);
1208 
1209 	/* See comment in restore_write. */
1210 	save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx);
1211 	dmu_tx_commit(tx);
1212 	return (0);
1213 }
1214 
1215 static int
1216 receive_write_embedded(struct receive_writer_arg *rwa,
1217     struct drr_write_embedded *drrwe, void *data)
1218 {
1219 	dmu_tx_t *tx;
1220 	int err;
1221 
1222 	if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
1223 		return (EINVAL);
1224 
1225 	if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
1226 		return (EINVAL);
1227 
1228 	if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
1229 		return (EINVAL);
1230 	if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
1231 		return (EINVAL);
1232 
1233 	if (drrwe->drr_object > rwa->max_object)
1234 		rwa->max_object = drrwe->drr_object;
1235 
1236 	tx = dmu_tx_create(rwa->os);
1237 
1238 	dmu_tx_hold_write(tx, drrwe->drr_object,
1239 	    drrwe->drr_offset, drrwe->drr_length);
1240 	err = dmu_tx_assign(tx, TXG_WAIT);
1241 	if (err != 0) {
1242 		dmu_tx_abort(tx);
1243 		return (err);
1244 	}
1245 
1246 	dmu_write_embedded(rwa->os, drrwe->drr_object,
1247 	    drrwe->drr_offset, data, drrwe->drr_etype,
1248 	    drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
1249 	    rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
1250 
1251 	/* See comment in restore_write. */
1252 	save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
1253 	dmu_tx_commit(tx);
1254 	return (0);
1255 }
1256 
1257 static int
1258 receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
1259     void *data)
1260 {
1261 	dmu_tx_t *tx;
1262 	dmu_buf_t *db, *db_spill;
1263 	int err;
1264 
1265 	if (drrs->drr_length < SPA_MINBLOCKSIZE ||
1266 	    drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
1267 		return (SET_ERROR(EINVAL));
1268 
1269 	if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
1270 		return (SET_ERROR(EINVAL));
1271 
1272 	if (drrs->drr_object > rwa->max_object)
1273 		rwa->max_object = drrs->drr_object;
1274 
1275 	VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
1276 	if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) {
1277 		dmu_buf_rele(db, FTAG);
1278 		return (err);
1279 	}
1280 
1281 	tx = dmu_tx_create(rwa->os);
1282 
1283 	dmu_tx_hold_spill(tx, db->db_object);
1284 
1285 	err = dmu_tx_assign(tx, TXG_WAIT);
1286 	if (err != 0) {
1287 		dmu_buf_rele(db, FTAG);
1288 		dmu_buf_rele(db_spill, FTAG);
1289 		dmu_tx_abort(tx);
1290 		return (err);
1291 	}
1292 	dmu_buf_will_dirty(db_spill, tx);
1293 
1294 	if (db_spill->db_size < drrs->drr_length)
1295 		VERIFY(0 == dbuf_spill_set_blksz(db_spill,
1296 		    drrs->drr_length, tx));
1297 	bcopy(data, db_spill->db_data, drrs->drr_length);
1298 
1299 	dmu_buf_rele(db, FTAG);
1300 	dmu_buf_rele(db_spill, FTAG);
1301 
1302 	dmu_tx_commit(tx);
1303 	return (0);
1304 }
1305 
1306 /* ARGSUSED */
1307 static int
1308 receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
1309 {
1310 	int err;
1311 
1312 	if (drrf->drr_length != -1ULL &&
1313 	    drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
1314 		return (SET_ERROR(EINVAL));
1315 
1316 	if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
1317 		return (SET_ERROR(EINVAL));
1318 
1319 	if (drrf->drr_object > rwa->max_object)
1320 		rwa->max_object = drrf->drr_object;
1321 
1322 	err = dmu_free_long_range(rwa->os, drrf->drr_object,
1323 	    drrf->drr_offset, drrf->drr_length);
1324 
1325 	return (err);
1326 }
1327 
1328 /* used to destroy the drc_ds on error */
1329 static void
1330 dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
1331 {
1332 	if (drc->drc_resumable) {
1333 		/* wait for our resume state to be written to disk */
1334 		txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0);
1335 		dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
1336 	} else {
1337 		char name[ZFS_MAX_DATASET_NAME_LEN];
1338 		dsl_dataset_name(drc->drc_ds, name);
1339 		dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
1340 		(void) dsl_destroy_head(name);
1341 	}
1342 }
1343 
1344 static void
1345 receive_cksum(struct receive_arg *ra, int len, void *buf)
1346 {
1347 	if (ra->byteswap) {
1348 		(void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
1349 	} else {
1350 		(void) fletcher_4_incremental_native(buf, len, &ra->cksum);
1351 	}
1352 }
1353 
1354 /*
1355  * Read the payload into a buffer of size len, and update the current record's
1356  * payload field.
1357  * Allocate ra->next_rrd and read the next record's header into
1358  * ra->next_rrd->header.
1359  * Verify checksum of payload and next record.
1360  */
1361 static int
1362 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
1363 {
1364 	int err;
1365 
1366 	if (len != 0) {
1367 		ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
1368 		err = receive_read(ra, len, buf);
1369 		if (err != 0)
1370 			return (err);
1371 		receive_cksum(ra, len, buf);
1372 
1373 		/* note: rrd is NULL when reading the begin record's payload */
1374 		if (ra->rrd != NULL) {
1375 			ra->rrd->payload = buf;
1376 			ra->rrd->payload_size = len;
1377 			ra->rrd->bytes_read = ra->bytes_read;
1378 		}
1379 	}
1380 
1381 	ra->prev_cksum = ra->cksum;
1382 
1383 	ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
1384 	err = receive_read(ra, sizeof (ra->next_rrd->header),
1385 	    &ra->next_rrd->header);
1386 	ra->next_rrd->bytes_read = ra->bytes_read;
1387 	if (err != 0) {
1388 		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1389 		ra->next_rrd = NULL;
1390 		return (err);
1391 	}
1392 	if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
1393 		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1394 		ra->next_rrd = NULL;
1395 		return (SET_ERROR(EINVAL));
1396 	}
1397 
1398 	/*
1399 	 * Note: checksum is of everything up to but not including the
1400 	 * checksum itself.
1401 	 */
1402 	ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
1403 	    ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
1404 	receive_cksum(ra,
1405 	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
1406 	    &ra->next_rrd->header);
1407 
1408 	zio_cksum_t cksum_orig =
1409 	    ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
1410 	zio_cksum_t *cksump =
1411 	    &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
1412 
1413 	if (ra->byteswap)
1414 		byteswap_record(&ra->next_rrd->header);
1415 
1416 	if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
1417 	    !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
1418 		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1419 		ra->next_rrd = NULL;
1420 		return (SET_ERROR(ECKSUM));
1421 	}
1422 
1423 	receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
1424 
1425 	return (0);
1426 }
1427 
1428 static void
1429 objlist_create(struct objlist *list)
1430 {
1431 	list_create(&list->list, sizeof (struct receive_objnode),
1432 	    offsetof(struct receive_objnode, node));
1433 	list->last_lookup = 0;
1434 }
1435 
1436 static void
1437 objlist_destroy(struct objlist *list)
1438 {
1439 	for (struct receive_objnode *n = list_remove_head(&list->list);
1440 	    n != NULL; n = list_remove_head(&list->list)) {
1441 		kmem_free(n, sizeof (*n));
1442 	}
1443 	list_destroy(&list->list);
1444 }
1445 
1446 /*
1447  * This function looks through the objlist to see if the specified object number
1448  * is contained in the objlist.  In the process, it will remove all object
1449  * numbers in the list that are smaller than the specified object number.  Thus,
1450  * any lookup of an object number smaller than a previously looked up object
1451  * number will always return false; therefore, all lookups should be done in
1452  * ascending order.
1453  */
1454 static boolean_t
1455 objlist_exists(struct objlist *list, uint64_t object)
1456 {
1457 	struct receive_objnode *node = list_head(&list->list);
1458 	ASSERT3U(object, >=, list->last_lookup);
1459 	list->last_lookup = object;
1460 	while (node != NULL && node->object < object) {
1461 		VERIFY3P(node, ==, list_remove_head(&list->list));
1462 		kmem_free(node, sizeof (*node));
1463 		node = list_head(&list->list);
1464 	}
1465 	return (node != NULL && node->object == object);
1466 }
1467 
1468 /*
1469  * The objlist is a list of object numbers stored in ascending order.  However,
1470  * the insertion of new object numbers does not seek out the correct location to
1471  * store a new object number; instead, it appends it to the list for simplicity.
1472  * Thus, any users must take care to only insert new object numbers in ascending
1473  * order.
1474  */
1475 static void
1476 objlist_insert(struct objlist *list, uint64_t object)
1477 {
1478 	struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
1479 	node->object = object;
1480 #ifdef ZFS_DEBUG
1481 	struct receive_objnode *last_object = list_tail(&list->list);
1482 	uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
1483 	ASSERT3U(node->object, >, last_objnum);
1484 #endif
1485 	list_insert_tail(&list->list, node);
1486 }
1487 
1488 /*
1489  * Issue the prefetch reads for any necessary indirect blocks.
1490  *
1491  * We use the object ignore list to tell us whether or not to issue prefetches
1492  * for a given object.  We do this for both correctness (in case the blocksize
1493  * of an object has changed) and performance (if the object doesn't exist, don't
1494  * needlessly try to issue prefetches).  We also trim the list as we go through
1495  * the stream to prevent it from growing to an unbounded size.
1496  *
1497  * The object numbers within will always be in sorted order, and any write
1498  * records we see will also be in sorted order, but they're not sorted with
1499  * respect to each other (i.e. we can get several object records before
1500  * receiving each object's write records).  As a result, once we've reached a
1501  * given object number, we can safely remove any reference to lower object
1502  * numbers in the ignore list. In practice, we receive up to 32 object records
1503  * before receiving write records, so the list can have up to 32 nodes in it.
1504  */
1505 /* ARGSUSED */
1506 static void
1507 receive_read_prefetch(struct receive_arg *ra,
1508     uint64_t object, uint64_t offset, uint64_t length)
1509 {
1510 	if (!objlist_exists(&ra->ignore_objlist, object)) {
1511 		dmu_prefetch(ra->os, object, 1, offset, length,
1512 		    ZIO_PRIORITY_SYNC_READ);
1513 	}
1514 }
1515 
1516 /*
1517  * Read records off the stream, issuing any necessary prefetches.
1518  */
1519 static int
1520 receive_read_record(struct receive_arg *ra)
1521 {
1522 	int err;
1523 
1524 	switch (ra->rrd->header.drr_type) {
1525 	case DRR_OBJECT:
1526 	{
1527 		struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
1528 		uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8);
1529 		void *buf = kmem_zalloc(size, KM_SLEEP);
1530 		dmu_object_info_t doi;
1531 		err = receive_read_payload_and_next_header(ra, size, buf);
1532 		if (err != 0) {
1533 			kmem_free(buf, size);
1534 			return (err);
1535 		}
1536 		err = dmu_object_info(ra->os, drro->drr_object, &doi);
1537 		/*
1538 		 * See receive_read_prefetch for an explanation why we're
1539 		 * storing this object in the ignore_obj_list.
1540 		 */
1541 		if (err == ENOENT ||
1542 		    (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
1543 			objlist_insert(&ra->ignore_objlist, drro->drr_object);
1544 			err = 0;
1545 		}
1546 		return (err);
1547 	}
1548 	case DRR_FREEOBJECTS:
1549 	{
1550 		err = receive_read_payload_and_next_header(ra, 0, NULL);
1551 		return (err);
1552 	}
1553 	case DRR_WRITE:
1554 	{
1555 		struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
1556 		arc_buf_t *abuf;
1557 		boolean_t is_meta = DMU_OT_IS_METADATA(drrw->drr_type);
1558 		if (DRR_WRITE_COMPRESSED(drrw)) {
1559 			ASSERT3U(drrw->drr_compressed_size, >, 0);
1560 			ASSERT3U(drrw->drr_logical_size, >=,
1561 			    drrw->drr_compressed_size);
1562 			ASSERT(!is_meta);
1563 			abuf = arc_loan_compressed_buf(
1564 			    dmu_objset_spa(ra->os),
1565 			    drrw->drr_compressed_size, drrw->drr_logical_size,
1566 			    drrw->drr_compressiontype);
1567 		} else {
1568 			abuf = arc_loan_buf(dmu_objset_spa(ra->os),
1569 			    is_meta, drrw->drr_logical_size);
1570 		}
1571 
1572 		err = receive_read_payload_and_next_header(ra,
1573 		    DRR_WRITE_PAYLOAD_SIZE(drrw), abuf->b_data);
1574 		if (err != 0) {
1575 			dmu_return_arcbuf(abuf);
1576 			return (err);
1577 		}
1578 		ra->rrd->write_buf = abuf;
1579 		receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
1580 		    drrw->drr_logical_size);
1581 		return (err);
1582 	}
1583 	case DRR_WRITE_BYREF:
1584 	{
1585 		struct drr_write_byref *drrwb =
1586 		    &ra->rrd->header.drr_u.drr_write_byref;
1587 		err = receive_read_payload_and_next_header(ra, 0, NULL);
1588 		receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
1589 		    drrwb->drr_length);
1590 		return (err);
1591 	}
1592 	case DRR_WRITE_EMBEDDED:
1593 	{
1594 		struct drr_write_embedded *drrwe =
1595 		    &ra->rrd->header.drr_u.drr_write_embedded;
1596 		uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
1597 		void *buf = kmem_zalloc(size, KM_SLEEP);
1598 
1599 		err = receive_read_payload_and_next_header(ra, size, buf);
1600 		if (err != 0) {
1601 			kmem_free(buf, size);
1602 			return (err);
1603 		}
1604 
1605 		receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
1606 		    drrwe->drr_length);
1607 		return (err);
1608 	}
1609 	case DRR_FREE:
1610 	{
1611 		/*
1612 		 * It might be beneficial to prefetch indirect blocks here, but
1613 		 * we don't really have the data to decide for sure.
1614 		 */
1615 		err = receive_read_payload_and_next_header(ra, 0, NULL);
1616 		return (err);
1617 	}
1618 	case DRR_END:
1619 	{
1620 		struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
1621 		if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
1622 			return (SET_ERROR(ECKSUM));
1623 		return (0);
1624 	}
1625 	case DRR_SPILL:
1626 	{
1627 		struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
1628 		void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
1629 		err = receive_read_payload_and_next_header(ra, drrs->drr_length,
1630 		    buf);
1631 		if (err != 0)
1632 			kmem_free(buf, drrs->drr_length);
1633 		return (err);
1634 	}
1635 	default:
1636 		return (SET_ERROR(EINVAL));
1637 	}
1638 }
1639 
1640 /*
1641  * Commit the records to the pool.
1642  */
1643 static int
1644 receive_process_record(struct receive_writer_arg *rwa,
1645     struct receive_record_arg *rrd)
1646 {
1647 	int err;
1648 
1649 	/* Processing in order, therefore bytes_read should be increasing. */
1650 	ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
1651 	rwa->bytes_read = rrd->bytes_read;
1652 
1653 	switch (rrd->header.drr_type) {
1654 	case DRR_OBJECT:
1655 	{
1656 		struct drr_object *drro = &rrd->header.drr_u.drr_object;
1657 		err = receive_object(rwa, drro, rrd->payload);
1658 		kmem_free(rrd->payload, rrd->payload_size);
1659 		rrd->payload = NULL;
1660 		return (err);
1661 	}
1662 	case DRR_FREEOBJECTS:
1663 	{
1664 		struct drr_freeobjects *drrfo =
1665 		    &rrd->header.drr_u.drr_freeobjects;
1666 		return (receive_freeobjects(rwa, drrfo));
1667 	}
1668 	case DRR_WRITE:
1669 	{
1670 		struct drr_write *drrw = &rrd->header.drr_u.drr_write;
1671 		err = receive_write(rwa, drrw, rrd->write_buf);
1672 		/* if receive_write() is successful, it consumes the arc_buf */
1673 		if (err != 0)
1674 			dmu_return_arcbuf(rrd->write_buf);
1675 		rrd->write_buf = NULL;
1676 		rrd->payload = NULL;
1677 		return (err);
1678 	}
1679 	case DRR_WRITE_BYREF:
1680 	{
1681 		struct drr_write_byref *drrwbr =
1682 		    &rrd->header.drr_u.drr_write_byref;
1683 		return (receive_write_byref(rwa, drrwbr));
1684 	}
1685 	case DRR_WRITE_EMBEDDED:
1686 	{
1687 		struct drr_write_embedded *drrwe =
1688 		    &rrd->header.drr_u.drr_write_embedded;
1689 		err = receive_write_embedded(rwa, drrwe, rrd->payload);
1690 		kmem_free(rrd->payload, rrd->payload_size);
1691 		rrd->payload = NULL;
1692 		return (err);
1693 	}
1694 	case DRR_FREE:
1695 	{
1696 		struct drr_free *drrf = &rrd->header.drr_u.drr_free;
1697 		return (receive_free(rwa, drrf));
1698 	}
1699 	case DRR_SPILL:
1700 	{
1701 		struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
1702 		err = receive_spill(rwa, drrs, rrd->payload);
1703 		kmem_free(rrd->payload, rrd->payload_size);
1704 		rrd->payload = NULL;
1705 		return (err);
1706 	}
1707 	default:
1708 		return (SET_ERROR(EINVAL));
1709 	}
1710 }
1711 
1712 /*
1713  * dmu_recv_stream's worker thread; pull records off the queue, and then call
1714  * receive_process_record  When we're done, signal the main thread and exit.
1715  */
1716 static void
1717 receive_writer_thread(void *arg)
1718 {
1719 	struct receive_writer_arg *rwa = arg;
1720 	struct receive_record_arg *rrd;
1721 	for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
1722 	    rrd = bqueue_dequeue(&rwa->q)) {
1723 		/*
1724 		 * If there's an error, the main thread will stop putting things
1725 		 * on the queue, but we need to clear everything in it before we
1726 		 * can exit.
1727 		 */
1728 		if (rwa->err == 0) {
1729 			rwa->err = receive_process_record(rwa, rrd);
1730 		} else if (rrd->write_buf != NULL) {
1731 			dmu_return_arcbuf(rrd->write_buf);
1732 			rrd->write_buf = NULL;
1733 			rrd->payload = NULL;
1734 		} else if (rrd->payload != NULL) {
1735 			kmem_free(rrd->payload, rrd->payload_size);
1736 			rrd->payload = NULL;
1737 		}
1738 		kmem_free(rrd, sizeof (*rrd));
1739 	}
1740 	kmem_free(rrd, sizeof (*rrd));
1741 	mutex_enter(&rwa->mutex);
1742 	rwa->done = B_TRUE;
1743 	cv_signal(&rwa->cv);
1744 	mutex_exit(&rwa->mutex);
1745 	thread_exit();
1746 }
1747 
1748 static int
1749 resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
1750 {
1751 	uint64_t val;
1752 	objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
1753 	uint64_t dsobj = dmu_objset_id(ra->os);
1754 	uint64_t resume_obj, resume_off;
1755 
1756 	if (nvlist_lookup_uint64(begin_nvl,
1757 	    "resume_object", &resume_obj) != 0 ||
1758 	    nvlist_lookup_uint64(begin_nvl,
1759 	    "resume_offset", &resume_off) != 0) {
1760 		return (SET_ERROR(EINVAL));
1761 	}
1762 	VERIFY0(zap_lookup(mos, dsobj,
1763 	    DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
1764 	if (resume_obj != val)
1765 		return (SET_ERROR(EINVAL));
1766 	VERIFY0(zap_lookup(mos, dsobj,
1767 	    DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
1768 	if (resume_off != val)
1769 		return (SET_ERROR(EINVAL));
1770 
1771 	return (0);
1772 }
1773 
1774 /*
1775  * Read in the stream's records, one by one, and apply them to the pool.  There
1776  * are two threads involved; the thread that calls this function will spin up a
1777  * worker thread, read the records off the stream one by one, and issue
1778  * prefetches for any necessary indirect blocks.  It will then push the records
1779  * onto an internal blocking queue.  The worker thread will pull the records off
1780  * the queue, and actually write the data into the DMU.  This way, the worker
1781  * thread doesn't have to wait for reads to complete, since everything it needs
1782  * (the indirect blocks) will be prefetched.
1783  *
1784  * NB: callers *must* call dmu_recv_end() if this succeeds.
1785  */
1786 int
1787 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
1788     int cleanup_fd, uint64_t *action_handlep)
1789 {
1790 	int err = 0;
1791 	struct receive_arg ra = { 0 };
1792 	struct receive_writer_arg rwa = { 0 };
1793 	int featureflags;
1794 	nvlist_t *begin_nvl = NULL;
1795 
1796 	ra.byteswap = drc->drc_byteswap;
1797 	ra.cksum = drc->drc_cksum;
1798 	ra.vp = vp;
1799 	ra.voff = *voffp;
1800 
1801 	if (dsl_dataset_is_zapified(drc->drc_ds)) {
1802 		(void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
1803 		    drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
1804 		    sizeof (ra.bytes_read), 1, &ra.bytes_read);
1805 	}
1806 
1807 	objlist_create(&ra.ignore_objlist);
1808 
1809 	/* these were verified in dmu_recv_begin */
1810 	ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
1811 	    DMU_SUBSTREAM);
1812 	ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
1813 
1814 	/*
1815 	 * Open the objset we are modifying.
1816 	 */
1817 	VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os));
1818 
1819 	ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
1820 
1821 	featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
1822 
1823 	/* if this stream is dedup'ed, set up the avl tree for guid mapping */
1824 	if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
1825 		minor_t minor;
1826 
1827 		if (cleanup_fd == -1) {
1828 			ra.err = SET_ERROR(EBADF);
1829 			goto out;
1830 		}
1831 		ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor);
1832 		if (ra.err != 0) {
1833 			cleanup_fd = -1;
1834 			goto out;
1835 		}
1836 
1837 		if (*action_handlep == 0) {
1838 			rwa.guid_to_ds_map =
1839 			    kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
1840 			avl_create(rwa.guid_to_ds_map, guid_compare,
1841 			    sizeof (guid_map_entry_t),
1842 			    offsetof(guid_map_entry_t, avlnode));
1843 			err = zfs_onexit_add_cb(minor,
1844 			    free_guid_map_onexit, rwa.guid_to_ds_map,
1845 			    action_handlep);
1846 			if (ra.err != 0)
1847 				goto out;
1848 		} else {
1849 			err = zfs_onexit_cb_data(minor, *action_handlep,
1850 			    (void **)&rwa.guid_to_ds_map);
1851 			if (ra.err != 0)
1852 				goto out;
1853 		}
1854 
1855 		drc->drc_guid_to_ds_map = rwa.guid_to_ds_map;
1856 	}
1857 
1858 	uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
1859 	void *payload = NULL;
1860 	if (payloadlen != 0)
1861 		payload = kmem_alloc(payloadlen, KM_SLEEP);
1862 
1863 	err = receive_read_payload_and_next_header(&ra, payloadlen, payload);
1864 	if (err != 0) {
1865 		if (payloadlen != 0)
1866 			kmem_free(payload, payloadlen);
1867 		goto out;
1868 	}
1869 	if (payloadlen != 0) {
1870 		err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
1871 		kmem_free(payload, payloadlen);
1872 		if (err != 0)
1873 			goto out;
1874 	}
1875 
1876 	if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
1877 		err = resume_check(&ra, begin_nvl);
1878 		if (err != 0)
1879 			goto out;
1880 	}
1881 
1882 	(void) bqueue_init(&rwa.q, zfs_recv_queue_length,
1883 	    offsetof(struct receive_record_arg, node));
1884 	cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL);
1885 	mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL);
1886 	rwa.os = ra.os;
1887 	rwa.byteswap = drc->drc_byteswap;
1888 	rwa.resumable = drc->drc_resumable;
1889 
1890 	(void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, curproc,
1891 	    TS_RUN, minclsyspri);
1892 	/*
1893 	 * We're reading rwa.err without locks, which is safe since we are the
1894 	 * only reader, and the worker thread is the only writer.  It's ok if we
1895 	 * miss a write for an iteration or two of the loop, since the writer
1896 	 * thread will keep freeing records we send it until we send it an eos
1897 	 * marker.
1898 	 *
1899 	 * We can leave this loop in 3 ways:  First, if rwa.err is
1900 	 * non-zero.  In that case, the writer thread will free the rrd we just
1901 	 * pushed.  Second, if  we're interrupted; in that case, either it's the
1902 	 * first loop and ra.rrd was never allocated, or it's later, and ra.rrd
1903 	 * has been handed off to the writer thread who will free it.  Finally,
1904 	 * if receive_read_record fails or we're at the end of the stream, then
1905 	 * we free ra.rrd and exit.
1906 	 */
1907 	while (rwa.err == 0) {
1908 		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1909 			err = SET_ERROR(EINTR);
1910 			break;
1911 		}
1912 
1913 		ASSERT3P(ra.rrd, ==, NULL);
1914 		ra.rrd = ra.next_rrd;
1915 		ra.next_rrd = NULL;
1916 		/* Allocates and loads header into ra.next_rrd */
1917 		err = receive_read_record(&ra);
1918 
1919 		if (ra.rrd->header.drr_type == DRR_END || err != 0) {
1920 			kmem_free(ra.rrd, sizeof (*ra.rrd));
1921 			ra.rrd = NULL;
1922 			break;
1923 		}
1924 
1925 		bqueue_enqueue(&rwa.q, ra.rrd,
1926 		    sizeof (struct receive_record_arg) + ra.rrd->payload_size);
1927 		ra.rrd = NULL;
1928 	}
1929 	if (ra.next_rrd == NULL)
1930 		ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP);
1931 	ra.next_rrd->eos_marker = B_TRUE;
1932 	bqueue_enqueue(&rwa.q, ra.next_rrd, 1);
1933 
1934 	mutex_enter(&rwa.mutex);
1935 	while (!rwa.done) {
1936 		cv_wait(&rwa.cv, &rwa.mutex);
1937 	}
1938 	mutex_exit(&rwa.mutex);
1939 
1940 	/*
1941 	 * If we are receiving a full stream as a clone, all object IDs which
1942 	 * are greater than the maximum ID referenced in the stream are
1943 	 * by definition unused and must be freed. Note that it's possible that
1944 	 * we've resumed this send and the first record we received was the END
1945 	 * record. In that case, max_object would be 0, but we shouldn't start
1946 	 * freeing all objects from there; instead we should start from the
1947 	 * resumeobj.
1948 	 */
1949 	if (drc->drc_clone && drc->drc_drrb->drr_fromguid == 0) {
1950 		uint64_t obj;
1951 		if (nvlist_lookup_uint64(begin_nvl, "resume_object", &obj) != 0)
1952 			obj = 0;
1953 		if (rwa.max_object > obj)
1954 			obj = rwa.max_object;
1955 		obj++;
1956 		int free_err = 0;
1957 		int next_err = 0;
1958 
1959 		while (next_err == 0) {
1960 			free_err = dmu_free_long_object(rwa.os, obj);
1961 			if (free_err != 0 && free_err != ENOENT)
1962 				break;
1963 
1964 			next_err = dmu_object_next(rwa.os, &obj, FALSE, 0);
1965 		}
1966 
1967 		if (err == 0) {
1968 			if (free_err != 0 && free_err != ENOENT)
1969 				err = free_err;
1970 			else if (next_err != ESRCH)
1971 				err = next_err;
1972 		}
1973 	}
1974 
1975 	cv_destroy(&rwa.cv);
1976 	mutex_destroy(&rwa.mutex);
1977 	bqueue_destroy(&rwa.q);
1978 	if (err == 0)
1979 		err = rwa.err;
1980 
1981 out:
1982 	nvlist_free(begin_nvl);
1983 	if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
1984 		zfs_onexit_fd_rele(cleanup_fd);
1985 
1986 	if (err != 0) {
1987 		/*
1988 		 * Clean up references. If receive is not resumable,
1989 		 * destroy what we created, so we don't leave it in
1990 		 * the inconsistent state.
1991 		 */
1992 		dmu_recv_cleanup_ds(drc);
1993 	}
1994 
1995 	*voffp = ra.voff;
1996 	objlist_destroy(&ra.ignore_objlist);
1997 	return (err);
1998 }
1999 
2000 static int
2001 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
2002 {
2003 	dmu_recv_cookie_t *drc = arg;
2004 	dsl_pool_t *dp = dmu_tx_pool(tx);
2005 	int error;
2006 
2007 	ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
2008 
2009 	if (!drc->drc_newfs) {
2010 		dsl_dataset_t *origin_head;
2011 
2012 		error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
2013 		if (error != 0)
2014 			return (error);
2015 		if (drc->drc_force) {
2016 			/*
2017 			 * We will destroy any snapshots in tofs (i.e. before
2018 			 * origin_head) that are after the origin (which is
2019 			 * the snap before drc_ds, because drc_ds can not
2020 			 * have any snaps of its own).
2021 			 */
2022 			uint64_t obj;
2023 
2024 			obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2025 			while (obj !=
2026 			    dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2027 				dsl_dataset_t *snap;
2028 				error = dsl_dataset_hold_obj(dp, obj, FTAG,
2029 				    &snap);
2030 				if (error != 0)
2031 					break;
2032 				if (snap->ds_dir != origin_head->ds_dir)
2033 					error = SET_ERROR(EINVAL);
2034 				if (error == 0)  {
2035 					error = dsl_destroy_snapshot_check_impl(
2036 					    snap, B_FALSE);
2037 				}
2038 				obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2039 				dsl_dataset_rele(snap, FTAG);
2040 				if (error != 0)
2041 					break;
2042 			}
2043 			if (error != 0) {
2044 				dsl_dataset_rele(origin_head, FTAG);
2045 				return (error);
2046 			}
2047 		}
2048 		error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
2049 		    origin_head, drc->drc_force, drc->drc_owner, tx);
2050 		if (error != 0) {
2051 			dsl_dataset_rele(origin_head, FTAG);
2052 			return (error);
2053 		}
2054 		error = dsl_dataset_snapshot_check_impl(origin_head,
2055 		    drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2056 		dsl_dataset_rele(origin_head, FTAG);
2057 		if (error != 0)
2058 			return (error);
2059 
2060 		error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
2061 	} else {
2062 		error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
2063 		    drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2064 	}
2065 	return (error);
2066 }
2067 
2068 static void
2069 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
2070 {
2071 	dmu_recv_cookie_t *drc = arg;
2072 	dsl_pool_t *dp = dmu_tx_pool(tx);
2073 
2074 	spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
2075 	    tx, "snap=%s", drc->drc_tosnap);
2076 
2077 	if (!drc->drc_newfs) {
2078 		dsl_dataset_t *origin_head;
2079 
2080 		VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
2081 		    &origin_head));
2082 
2083 		if (drc->drc_force) {
2084 			/*
2085 			 * Destroy any snapshots of drc_tofs (origin_head)
2086 			 * after the origin (the snap before drc_ds).
2087 			 */
2088 			uint64_t obj;
2089 
2090 			obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2091 			while (obj !=
2092 			    dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2093 				dsl_dataset_t *snap;
2094 				VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
2095 				    &snap));
2096 				ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
2097 				obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2098 				dsl_destroy_snapshot_sync_impl(snap,
2099 				    B_FALSE, tx);
2100 				dsl_dataset_rele(snap, FTAG);
2101 			}
2102 		}
2103 		VERIFY3P(drc->drc_ds->ds_prev, ==,
2104 		    origin_head->ds_prev);
2105 
2106 		dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
2107 		    origin_head, tx);
2108 		dsl_dataset_snapshot_sync_impl(origin_head,
2109 		    drc->drc_tosnap, tx);
2110 
2111 		/* set snapshot's creation time and guid */
2112 		dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
2113 		dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
2114 		    drc->drc_drrb->drr_creation_time;
2115 		dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
2116 		    drc->drc_drrb->drr_toguid;
2117 		dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
2118 		    ~DS_FLAG_INCONSISTENT;
2119 
2120 		dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
2121 		dsl_dataset_phys(origin_head)->ds_flags &=
2122 		    ~DS_FLAG_INCONSISTENT;
2123 
2124 		drc->drc_newsnapobj =
2125 		    dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2126 
2127 		dsl_dataset_rele(origin_head, FTAG);
2128 		dsl_destroy_head_sync_impl(drc->drc_ds, tx);
2129 
2130 		if (drc->drc_owner != NULL)
2131 			VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
2132 	} else {
2133 		dsl_dataset_t *ds = drc->drc_ds;
2134 
2135 		dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
2136 
2137 		/* set snapshot's creation time and guid */
2138 		dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2139 		dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
2140 		    drc->drc_drrb->drr_creation_time;
2141 		dsl_dataset_phys(ds->ds_prev)->ds_guid =
2142 		    drc->drc_drrb->drr_toguid;
2143 		dsl_dataset_phys(ds->ds_prev)->ds_flags &=
2144 		    ~DS_FLAG_INCONSISTENT;
2145 
2146 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
2147 		dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
2148 		if (dsl_dataset_has_resume_receive_state(ds)) {
2149 			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2150 			    DS_FIELD_RESUME_FROMGUID, tx);
2151 			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2152 			    DS_FIELD_RESUME_OBJECT, tx);
2153 			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2154 			    DS_FIELD_RESUME_OFFSET, tx);
2155 			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2156 			    DS_FIELD_RESUME_BYTES, tx);
2157 			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2158 			    DS_FIELD_RESUME_TOGUID, tx);
2159 			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2160 			    DS_FIELD_RESUME_TONAME, tx);
2161 		}
2162 		drc->drc_newsnapobj =
2163 		    dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
2164 	}
2165 	/*
2166 	 * Release the hold from dmu_recv_begin.  This must be done before
2167 	 * we return to open context, so that when we free the dataset's dnode,
2168 	 * we can evict its bonus buffer.
2169 	 */
2170 	dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2171 	drc->drc_ds = NULL;
2172 }
2173 
2174 static int
2175 add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj)
2176 {
2177 	dsl_pool_t *dp;
2178 	dsl_dataset_t *snapds;
2179 	guid_map_entry_t *gmep;
2180 	int err;
2181 
2182 	ASSERT(guid_map != NULL);
2183 
2184 	err = dsl_pool_hold(name, FTAG, &dp);
2185 	if (err != 0)
2186 		return (err);
2187 	gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
2188 	err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds);
2189 	if (err == 0) {
2190 		gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
2191 		gmep->gme_ds = snapds;
2192 		avl_add(guid_map, gmep);
2193 		dsl_dataset_long_hold(snapds, gmep);
2194 	} else {
2195 		kmem_free(gmep, sizeof (*gmep));
2196 	}
2197 
2198 	dsl_pool_rele(dp, FTAG);
2199 	return (err);
2200 }
2201 
2202 static int dmu_recv_end_modified_blocks = 3;
2203 
2204 static int
2205 dmu_recv_existing_end(dmu_recv_cookie_t *drc)
2206 {
2207 #ifdef _KERNEL
2208 	/*
2209 	 * We will be destroying the ds; make sure its origin is unmounted if
2210 	 * necessary.
2211 	 */
2212 	char name[ZFS_MAX_DATASET_NAME_LEN];
2213 	dsl_dataset_name(drc->drc_ds, name);
2214 	zfs_destroy_unmount_origin(name);
2215 #endif
2216 
2217 	return (dsl_sync_task(drc->drc_tofs,
2218 	    dmu_recv_end_check, dmu_recv_end_sync, drc,
2219 	    dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
2220 }
2221 
2222 static int
2223 dmu_recv_new_end(dmu_recv_cookie_t *drc)
2224 {
2225 	return (dsl_sync_task(drc->drc_tofs,
2226 	    dmu_recv_end_check, dmu_recv_end_sync, drc,
2227 	    dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
2228 }
2229 
2230 int
2231 dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
2232 {
2233 	int error;
2234 
2235 	drc->drc_owner = owner;
2236 
2237 	if (drc->drc_newfs)
2238 		error = dmu_recv_new_end(drc);
2239 	else
2240 		error = dmu_recv_existing_end(drc);
2241 
2242 	if (error != 0) {
2243 		dmu_recv_cleanup_ds(drc);
2244 	} else if (drc->drc_guid_to_ds_map != NULL) {
2245 		(void) add_ds_to_guidmap(drc->drc_tofs,
2246 		    drc->drc_guid_to_ds_map,
2247 		    drc->drc_newsnapobj);
2248 	}
2249 	return (error);
2250 }
2251 
2252 /*
2253  * Return TRUE if this objset is currently being received into.
2254  */
2255 boolean_t
2256 dmu_objset_is_receiving(objset_t *os)
2257 {
2258 	return (os->os_dsl_dataset != NULL &&
2259 	    os->os_dsl_dataset->ds_owner == dmu_recv_tag);
2260 }
2261