xref: /illumos-gate/usr/src/uts/common/fs/zfs/dsl_pool.c (revision 6f459ff5b49a8482416f3eab8866c784121ecae3)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
24  * Copyright (c) 2013 Steven Hartland. All rights reserved.
25  * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
26  * Copyright (c) 2014 Integros [integros.com]
27  * Copyright 2016 Nexenta Systems, Inc.  All rights reserved.
28  */
29 
30 #include <sys/dsl_pool.h>
31 #include <sys/dsl_dataset.h>
32 #include <sys/dsl_prop.h>
33 #include <sys/dsl_dir.h>
34 #include <sys/dsl_synctask.h>
35 #include <sys/dsl_scan.h>
36 #include <sys/dnode.h>
37 #include <sys/dmu_tx.h>
38 #include <sys/dmu_objset.h>
39 #include <sys/arc.h>
40 #include <sys/zap.h>
41 #include <sys/zio.h>
42 #include <sys/zfs_context.h>
43 #include <sys/fs/zfs.h>
44 #include <sys/zfs_znode.h>
45 #include <sys/spa_impl.h>
46 #include <sys/dsl_deadlist.h>
47 #include <sys/bptree.h>
48 #include <sys/zfeature.h>
49 #include <sys/zil_impl.h>
50 #include <sys/dsl_userhold.h>
51 
52 /*
53  * ZFS Write Throttle
54  * ------------------
55  *
56  * ZFS must limit the rate of incoming writes to the rate at which it is able
57  * to sync data modifications to the backend storage. Throttling by too much
58  * creates an artificial limit; throttling by too little can only be sustained
59  * for short periods and would lead to highly lumpy performance. On a per-pool
60  * basis, ZFS tracks the amount of modified (dirty) data. As operations change
61  * data, the amount of dirty data increases; as ZFS syncs out data, the amount
62  * of dirty data decreases. When the amount of dirty data exceeds a
63  * predetermined threshold further modifications are blocked until the amount
64  * of dirty data decreases (as data is synced out).
65  *
66  * The limit on dirty data is tunable, and should be adjusted according to
67  * both the IO capacity and available memory of the system. The larger the
68  * window, the more ZFS is able to aggregate and amortize metadata (and data)
69  * changes. However, memory is a limited resource, and allowing for more dirty
70  * data comes at the cost of keeping other useful data in memory (for example
71  * ZFS data cached by the ARC).
72  *
73  * Implementation
74  *
75  * As buffers are modified dsl_pool_willuse_space() increments both the per-
76  * txg (dp_dirty_pertxg[]) and poolwide (dp_dirty_total) accounting of
77  * dirty space used; dsl_pool_dirty_space() decrements those values as data
78  * is synced out from dsl_pool_sync(). While only the poolwide value is
79  * relevant, the per-txg value is useful for debugging. The tunable
80  * zfs_dirty_data_max determines the dirty space limit. Once that value is
81  * exceeded, new writes are halted until space frees up.
82  *
83  * The zfs_dirty_data_sync tunable dictates the threshold at which we
84  * ensure that there is a txg syncing (see the comment in txg.c for a full
85  * description of transaction group stages).
86  *
87  * The IO scheduler uses both the dirty space limit and current amount of
88  * dirty data as inputs. Those values affect the number of concurrent IOs ZFS
89  * issues. See the comment in vdev_queue.c for details of the IO scheduler.
90  *
91  * The delay is also calculated based on the amount of dirty data.  See the
92  * comment above dmu_tx_delay() for details.
93  */
94 
95 /*
96  * zfs_dirty_data_max will be set to zfs_dirty_data_max_percent% of all memory,
97  * capped at zfs_dirty_data_max_max.  It can also be overridden in /etc/system.
98  */
99 uint64_t zfs_dirty_data_max;
100 uint64_t zfs_dirty_data_max_max = 4ULL * 1024 * 1024 * 1024;
101 int zfs_dirty_data_max_percent = 10;
102 
103 /*
104  * If there is at least this much dirty data, push out a txg.
105  */
106 uint64_t zfs_dirty_data_sync = 64 * 1024 * 1024;
107 
108 /*
109  * Once there is this amount of dirty data, the dmu_tx_delay() will kick in
110  * and delay each transaction.
111  * This value should be >= zfs_vdev_async_write_active_max_dirty_percent.
112  */
113 int zfs_delay_min_dirty_percent = 60;
114 
115 /*
116  * This controls how quickly the delay approaches infinity.
117  * Larger values cause it to delay more for a given amount of dirty data.
118  * Therefore larger values will cause there to be less dirty data for a
119  * given throughput.
120  *
121  * For the smoothest delay, this value should be about 1 billion divided
122  * by the maximum number of operations per second.  This will smoothly
123  * handle between 10x and 1/10th this number.
124  *
125  * Note: zfs_delay_scale * zfs_dirty_data_max must be < 2^64, due to the
126  * multiply in dmu_tx_delay().
127  */
128 uint64_t zfs_delay_scale = 1000 * 1000 * 1000 / 2000;
129 
130 /*
131  * This determines the number of threads used by the dp_sync_taskq.
132  */
133 int zfs_sync_taskq_batch_pct = 75;
134 
135 /*
136  * These tunables determine the behavior of how zil_itxg_clean() is
137  * called via zil_clean() in the context of spa_sync(). When an itxg
138  * list needs to be cleaned, TQ_NOSLEEP will be used when dispatching.
139  * If the dispatch fails, the call to zil_itxg_clean() will occur
140  * synchronously in the context of spa_sync(), which can negatively
141  * impact the performance of spa_sync() (e.g. in the case of the itxg
142  * list having a large number of itxs that needs to be cleaned).
143  *
144  * Thus, these tunables can be used to manipulate the behavior of the
145  * taskq used by zil_clean(); they determine the number of taskq entries
146  * that are pre-populated when the taskq is first created (via the
147  * "zfs_zil_clean_taskq_minalloc" tunable) and the maximum number of
148  * taskq entries that are cached after an on-demand allocation (via the
149  * "zfs_zil_clean_taskq_maxalloc").
150  *
151  * The idea being, we want to try reasonably hard to ensure there will
152  * already be a taskq entry pre-allocated by the time that it is needed
153  * by zil_clean(). This way, we can avoid the possibility of an
154  * on-demand allocation of a new taskq entry from failing, which would
155  * result in zil_itxg_clean() being called synchronously from zil_clean()
156  * (which can adversely affect performance of spa_sync()).
157  *
158  * Additionally, the number of threads used by the taskq can be
159  * configured via the "zfs_zil_clean_taskq_nthr_pct" tunable.
160  */
161 int zfs_zil_clean_taskq_nthr_pct = 100;
162 int zfs_zil_clean_taskq_minalloc = 1024;
163 int zfs_zil_clean_taskq_maxalloc = 1024 * 1024;
164 
165 int
166 dsl_pool_open_special_dir(dsl_pool_t *dp, const char *name, dsl_dir_t **ddp)
167 {
168 	uint64_t obj;
169 	int err;
170 
171 	err = zap_lookup(dp->dp_meta_objset,
172 	    dsl_dir_phys(dp->dp_root_dir)->dd_child_dir_zapobj,
173 	    name, sizeof (obj), 1, &obj);
174 	if (err)
175 		return (err);
176 
177 	return (dsl_dir_hold_obj(dp, obj, name, dp, ddp));
178 }
179 
180 static dsl_pool_t *
181 dsl_pool_open_impl(spa_t *spa, uint64_t txg)
182 {
183 	dsl_pool_t *dp;
184 	blkptr_t *bp = spa_get_rootblkptr(spa);
185 
186 	dp = kmem_zalloc(sizeof (dsl_pool_t), KM_SLEEP);
187 	dp->dp_spa = spa;
188 	dp->dp_meta_rootbp = *bp;
189 	rrw_init(&dp->dp_config_rwlock, B_TRUE);
190 	txg_init(dp, txg);
191 
192 	txg_list_create(&dp->dp_dirty_datasets, spa,
193 	    offsetof(dsl_dataset_t, ds_dirty_link));
194 	txg_list_create(&dp->dp_dirty_zilogs, spa,
195 	    offsetof(zilog_t, zl_dirty_link));
196 	txg_list_create(&dp->dp_dirty_dirs, spa,
197 	    offsetof(dsl_dir_t, dd_dirty_link));
198 	txg_list_create(&dp->dp_sync_tasks, spa,
199 	    offsetof(dsl_sync_task_t, dst_node));
200 
201 	dp->dp_sync_taskq = taskq_create("dp_sync_taskq",
202 	    zfs_sync_taskq_batch_pct, minclsyspri, 1, INT_MAX,
203 	    TASKQ_THREADS_CPU_PCT);
204 
205 	dp->dp_zil_clean_taskq = taskq_create("dp_zil_clean_taskq",
206 	    zfs_zil_clean_taskq_nthr_pct, minclsyspri,
207 	    zfs_zil_clean_taskq_minalloc,
208 	    zfs_zil_clean_taskq_maxalloc,
209 	    TASKQ_PREPOPULATE | TASKQ_THREADS_CPU_PCT);
210 
211 	mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
212 	cv_init(&dp->dp_spaceavail_cv, NULL, CV_DEFAULT, NULL);
213 
214 	dp->dp_vnrele_taskq = taskq_create("zfs_vn_rele_taskq", 1, minclsyspri,
215 	    1, 4, 0);
216 
217 	return (dp);
218 }
219 
220 int
221 dsl_pool_init(spa_t *spa, uint64_t txg, dsl_pool_t **dpp)
222 {
223 	int err;
224 	dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
225 
226 	err = dmu_objset_open_impl(spa, NULL, &dp->dp_meta_rootbp,
227 	    &dp->dp_meta_objset);
228 	if (err != 0)
229 		dsl_pool_close(dp);
230 	else
231 		*dpp = dp;
232 
233 	return (err);
234 }
235 
236 int
237 dsl_pool_open(dsl_pool_t *dp)
238 {
239 	int err;
240 	dsl_dir_t *dd;
241 	dsl_dataset_t *ds;
242 	uint64_t obj;
243 
244 	rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
245 	err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
246 	    DMU_POOL_ROOT_DATASET, sizeof (uint64_t), 1,
247 	    &dp->dp_root_dir_obj);
248 	if (err)
249 		goto out;
250 
251 	err = dsl_dir_hold_obj(dp, dp->dp_root_dir_obj,
252 	    NULL, dp, &dp->dp_root_dir);
253 	if (err)
254 		goto out;
255 
256 	err = dsl_pool_open_special_dir(dp, MOS_DIR_NAME, &dp->dp_mos_dir);
257 	if (err)
258 		goto out;
259 
260 	if (spa_version(dp->dp_spa) >= SPA_VERSION_ORIGIN) {
261 		err = dsl_pool_open_special_dir(dp, ORIGIN_DIR_NAME, &dd);
262 		if (err)
263 			goto out;
264 		err = dsl_dataset_hold_obj(dp,
265 		    dsl_dir_phys(dd)->dd_head_dataset_obj, FTAG, &ds);
266 		if (err == 0) {
267 			err = dsl_dataset_hold_obj(dp,
268 			    dsl_dataset_phys(ds)->ds_prev_snap_obj, dp,
269 			    &dp->dp_origin_snap);
270 			dsl_dataset_rele(ds, FTAG);
271 		}
272 		dsl_dir_rele(dd, dp);
273 		if (err)
274 			goto out;
275 	}
276 
277 	if (spa_version(dp->dp_spa) >= SPA_VERSION_DEADLISTS) {
278 		err = dsl_pool_open_special_dir(dp, FREE_DIR_NAME,
279 		    &dp->dp_free_dir);
280 		if (err)
281 			goto out;
282 
283 		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
284 		    DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj);
285 		if (err)
286 			goto out;
287 		VERIFY0(bpobj_open(&dp->dp_free_bpobj,
288 		    dp->dp_meta_objset, obj));
289 	}
290 
291 	if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_OBSOLETE_COUNTS)) {
292 		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
293 		    DMU_POOL_OBSOLETE_BPOBJ, sizeof (uint64_t), 1, &obj);
294 		if (err == 0) {
295 			VERIFY0(bpobj_open(&dp->dp_obsolete_bpobj,
296 			    dp->dp_meta_objset, obj));
297 		} else if (err == ENOENT) {
298 			/*
299 			 * We might not have created the remap bpobj yet.
300 			 */
301 			err = 0;
302 		} else {
303 			goto out;
304 		}
305 	}
306 
307 	/*
308 	 * Note: errors ignored, because the these special dirs, used for
309 	 * space accounting, are only created on demand.
310 	 */
311 	(void) dsl_pool_open_special_dir(dp, LEAK_DIR_NAME,
312 	    &dp->dp_leak_dir);
313 
314 	if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_ASYNC_DESTROY)) {
315 		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
316 		    DMU_POOL_BPTREE_OBJ, sizeof (uint64_t), 1,
317 		    &dp->dp_bptree_obj);
318 		if (err != 0)
319 			goto out;
320 	}
321 
322 	if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMPTY_BPOBJ)) {
323 		err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
324 		    DMU_POOL_EMPTY_BPOBJ, sizeof (uint64_t), 1,
325 		    &dp->dp_empty_bpobj);
326 		if (err != 0)
327 			goto out;
328 	}
329 
330 	err = zap_lookup(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
331 	    DMU_POOL_TMP_USERREFS, sizeof (uint64_t), 1,
332 	    &dp->dp_tmp_userrefs_obj);
333 	if (err == ENOENT)
334 		err = 0;
335 	if (err)
336 		goto out;
337 
338 	err = dsl_scan_init(dp, dp->dp_tx.tx_open_txg);
339 
340 out:
341 	rrw_exit(&dp->dp_config_rwlock, FTAG);
342 	return (err);
343 }
344 
345 void
346 dsl_pool_close(dsl_pool_t *dp)
347 {
348 	/*
349 	 * Drop our references from dsl_pool_open().
350 	 *
351 	 * Since we held the origin_snap from "syncing" context (which
352 	 * includes pool-opening context), it actually only got a "ref"
353 	 * and not a hold, so just drop that here.
354 	 */
355 	if (dp->dp_origin_snap != NULL)
356 		dsl_dataset_rele(dp->dp_origin_snap, dp);
357 	if (dp->dp_mos_dir != NULL)
358 		dsl_dir_rele(dp->dp_mos_dir, dp);
359 	if (dp->dp_free_dir != NULL)
360 		dsl_dir_rele(dp->dp_free_dir, dp);
361 	if (dp->dp_leak_dir != NULL)
362 		dsl_dir_rele(dp->dp_leak_dir, dp);
363 	if (dp->dp_root_dir != NULL)
364 		dsl_dir_rele(dp->dp_root_dir, dp);
365 
366 	bpobj_close(&dp->dp_free_bpobj);
367 	bpobj_close(&dp->dp_obsolete_bpobj);
368 
369 	/* undo the dmu_objset_open_impl(mos) from dsl_pool_open() */
370 	if (dp->dp_meta_objset != NULL)
371 		dmu_objset_evict(dp->dp_meta_objset);
372 
373 	txg_list_destroy(&dp->dp_dirty_datasets);
374 	txg_list_destroy(&dp->dp_dirty_zilogs);
375 	txg_list_destroy(&dp->dp_sync_tasks);
376 	txg_list_destroy(&dp->dp_dirty_dirs);
377 
378 	taskq_destroy(dp->dp_zil_clean_taskq);
379 	taskq_destroy(dp->dp_sync_taskq);
380 
381 	/*
382 	 * We can't set retry to TRUE since we're explicitly specifying
383 	 * a spa to flush. This is good enough; any missed buffers for
384 	 * this spa won't cause trouble, and they'll eventually fall
385 	 * out of the ARC just like any other unused buffer.
386 	 */
387 	arc_flush(dp->dp_spa, FALSE);
388 
389 	txg_fini(dp);
390 	dsl_scan_fini(dp);
391 	dmu_buf_user_evict_wait();
392 
393 	rrw_destroy(&dp->dp_config_rwlock);
394 	mutex_destroy(&dp->dp_lock);
395 	taskq_destroy(dp->dp_vnrele_taskq);
396 	if (dp->dp_blkstats != NULL)
397 		kmem_free(dp->dp_blkstats, sizeof (zfs_all_blkstats_t));
398 	kmem_free(dp, sizeof (dsl_pool_t));
399 }
400 
401 void
402 dsl_pool_create_obsolete_bpobj(dsl_pool_t *dp, dmu_tx_t *tx)
403 {
404 	uint64_t obj;
405 	/*
406 	 * Currently, we only create the obsolete_bpobj where there are
407 	 * indirect vdevs with referenced mappings.
408 	 */
409 	ASSERT(spa_feature_is_active(dp->dp_spa, SPA_FEATURE_DEVICE_REMOVAL));
410 	/* create and open the obsolete_bpobj */
411 	obj = bpobj_alloc(dp->dp_meta_objset, SPA_OLD_MAXBLOCKSIZE, tx);
412 	VERIFY0(bpobj_open(&dp->dp_obsolete_bpobj, dp->dp_meta_objset, obj));
413 	VERIFY0(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
414 	    DMU_POOL_OBSOLETE_BPOBJ, sizeof (uint64_t), 1, &obj, tx));
415 	spa_feature_incr(dp->dp_spa, SPA_FEATURE_OBSOLETE_COUNTS, tx);
416 }
417 
418 void
419 dsl_pool_destroy_obsolete_bpobj(dsl_pool_t *dp, dmu_tx_t *tx)
420 {
421 	spa_feature_decr(dp->dp_spa, SPA_FEATURE_OBSOLETE_COUNTS, tx);
422 	VERIFY0(zap_remove(dp->dp_meta_objset,
423 	    DMU_POOL_DIRECTORY_OBJECT,
424 	    DMU_POOL_OBSOLETE_BPOBJ, tx));
425 	bpobj_free(dp->dp_meta_objset,
426 	    dp->dp_obsolete_bpobj.bpo_object, tx);
427 	bpobj_close(&dp->dp_obsolete_bpobj);
428 }
429 
430 dsl_pool_t *
431 dsl_pool_create(spa_t *spa, nvlist_t *zplprops, uint64_t txg)
432 {
433 	int err;
434 	dsl_pool_t *dp = dsl_pool_open_impl(spa, txg);
435 	dmu_tx_t *tx = dmu_tx_create_assigned(dp, txg);
436 	dsl_dataset_t *ds;
437 	uint64_t obj;
438 
439 	rrw_enter(&dp->dp_config_rwlock, RW_WRITER, FTAG);
440 
441 	/* create and open the MOS (meta-objset) */
442 	dp->dp_meta_objset = dmu_objset_create_impl(spa,
443 	    NULL, &dp->dp_meta_rootbp, DMU_OST_META, tx);
444 
445 	/* create the pool directory */
446 	err = zap_create_claim(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
447 	    DMU_OT_OBJECT_DIRECTORY, DMU_OT_NONE, 0, tx);
448 	ASSERT0(err);
449 
450 	/* Initialize scan structures */
451 	VERIFY0(dsl_scan_init(dp, txg));
452 
453 	/* create and open the root dir */
454 	dp->dp_root_dir_obj = dsl_dir_create_sync(dp, NULL, NULL, tx);
455 	VERIFY0(dsl_dir_hold_obj(dp, dp->dp_root_dir_obj,
456 	    NULL, dp, &dp->dp_root_dir));
457 
458 	/* create and open the meta-objset dir */
459 	(void) dsl_dir_create_sync(dp, dp->dp_root_dir, MOS_DIR_NAME, tx);
460 	VERIFY0(dsl_pool_open_special_dir(dp,
461 	    MOS_DIR_NAME, &dp->dp_mos_dir));
462 
463 	if (spa_version(spa) >= SPA_VERSION_DEADLISTS) {
464 		/* create and open the free dir */
465 		(void) dsl_dir_create_sync(dp, dp->dp_root_dir,
466 		    FREE_DIR_NAME, tx);
467 		VERIFY0(dsl_pool_open_special_dir(dp,
468 		    FREE_DIR_NAME, &dp->dp_free_dir));
469 
470 		/* create and open the free_bplist */
471 		obj = bpobj_alloc(dp->dp_meta_objset, SPA_OLD_MAXBLOCKSIZE, tx);
472 		VERIFY(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
473 		    DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx) == 0);
474 		VERIFY0(bpobj_open(&dp->dp_free_bpobj,
475 		    dp->dp_meta_objset, obj));
476 	}
477 
478 	if (spa_version(spa) >= SPA_VERSION_DSL_SCRUB)
479 		dsl_pool_create_origin(dp, tx);
480 
481 	/* create the root dataset */
482 	obj = dsl_dataset_create_sync_dd(dp->dp_root_dir, NULL, 0, tx);
483 
484 	/* create the root objset */
485 	VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, &ds));
486 #ifdef _KERNEL
487 	{
488 		objset_t *os;
489 		rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
490 		os = dmu_objset_create_impl(dp->dp_spa, ds,
491 		    dsl_dataset_get_blkptr(ds), DMU_OST_ZFS, tx);
492 		rrw_exit(&ds->ds_bp_rwlock, FTAG);
493 		zfs_create_fs(os, kcred, zplprops, tx);
494 	}
495 #endif
496 	dsl_dataset_rele(ds, FTAG);
497 
498 	dmu_tx_commit(tx);
499 
500 	rrw_exit(&dp->dp_config_rwlock, FTAG);
501 
502 	return (dp);
503 }
504 
505 /*
506  * Account for the meta-objset space in its placeholder dsl_dir.
507  */
508 void
509 dsl_pool_mos_diduse_space(dsl_pool_t *dp,
510     int64_t used, int64_t comp, int64_t uncomp)
511 {
512 	ASSERT3U(comp, ==, uncomp); /* it's all metadata */
513 	mutex_enter(&dp->dp_lock);
514 	dp->dp_mos_used_delta += used;
515 	dp->dp_mos_compressed_delta += comp;
516 	dp->dp_mos_uncompressed_delta += uncomp;
517 	mutex_exit(&dp->dp_lock);
518 }
519 
520 static void
521 dsl_pool_sync_mos(dsl_pool_t *dp, dmu_tx_t *tx)
522 {
523 	zio_t *zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
524 	dmu_objset_sync(dp->dp_meta_objset, zio, tx);
525 	VERIFY0(zio_wait(zio));
526 	dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
527 	spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
528 }
529 
530 static void
531 dsl_pool_dirty_delta(dsl_pool_t *dp, int64_t delta)
532 {
533 	ASSERT(MUTEX_HELD(&dp->dp_lock));
534 
535 	if (delta < 0)
536 		ASSERT3U(-delta, <=, dp->dp_dirty_total);
537 
538 	dp->dp_dirty_total += delta;
539 
540 	/*
541 	 * Note: we signal even when increasing dp_dirty_total.
542 	 * This ensures forward progress -- each thread wakes the next waiter.
543 	 */
544 	if (dp->dp_dirty_total < zfs_dirty_data_max)
545 		cv_signal(&dp->dp_spaceavail_cv);
546 }
547 
548 void
549 dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
550 {
551 	zio_t *zio;
552 	dmu_tx_t *tx;
553 	dsl_dir_t *dd;
554 	dsl_dataset_t *ds;
555 	objset_t *mos = dp->dp_meta_objset;
556 	list_t synced_datasets;
557 
558 	list_create(&synced_datasets, sizeof (dsl_dataset_t),
559 	    offsetof(dsl_dataset_t, ds_synced_link));
560 
561 	tx = dmu_tx_create_assigned(dp, txg);
562 
563 	/*
564 	 * Write out all dirty blocks of dirty datasets.
565 	 */
566 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
567 	while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
568 		/*
569 		 * We must not sync any non-MOS datasets twice, because
570 		 * we may have taken a snapshot of them.  However, we
571 		 * may sync newly-created datasets on pass 2.
572 		 */
573 		ASSERT(!list_link_active(&ds->ds_synced_link));
574 		list_insert_tail(&synced_datasets, ds);
575 		dsl_dataset_sync(ds, zio, tx);
576 	}
577 	VERIFY0(zio_wait(zio));
578 
579 	/*
580 	 * We have written all of the accounted dirty data, so our
581 	 * dp_space_towrite should now be zero.  However, some seldom-used
582 	 * code paths do not adhere to this (e.g. dbuf_undirty(), also
583 	 * rounding error in dbuf_write_physdone).
584 	 * Shore up the accounting of any dirtied space now.
585 	 */
586 	dsl_pool_undirty_space(dp, dp->dp_dirty_pertxg[txg & TXG_MASK], txg);
587 
588 	/*
589 	 * Update the long range free counter after
590 	 * we're done syncing user data
591 	 */
592 	mutex_enter(&dp->dp_lock);
593 	ASSERT(spa_sync_pass(dp->dp_spa) == 1 ||
594 	    dp->dp_long_free_dirty_pertxg[txg & TXG_MASK] == 0);
595 	dp->dp_long_free_dirty_pertxg[txg & TXG_MASK] = 0;
596 	mutex_exit(&dp->dp_lock);
597 
598 	/*
599 	 * After the data blocks have been written (ensured by the zio_wait()
600 	 * above), update the user/group space accounting.  This happens
601 	 * in tasks dispatched to dp_sync_taskq, so wait for them before
602 	 * continuing.
603 	 */
604 	for (ds = list_head(&synced_datasets); ds != NULL;
605 	    ds = list_next(&synced_datasets, ds)) {
606 		dmu_objset_do_userquota_updates(ds->ds_objset, tx);
607 	}
608 	taskq_wait(dp->dp_sync_taskq);
609 
610 	/*
611 	 * Sync the datasets again to push out the changes due to
612 	 * userspace updates.  This must be done before we process the
613 	 * sync tasks, so that any snapshots will have the correct
614 	 * user accounting information (and we won't get confused
615 	 * about which blocks are part of the snapshot).
616 	 */
617 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
618 	while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
619 		ASSERT(list_link_active(&ds->ds_synced_link));
620 		dmu_buf_rele(ds->ds_dbuf, ds);
621 		dsl_dataset_sync(ds, zio, tx);
622 	}
623 	VERIFY0(zio_wait(zio));
624 
625 	/*
626 	 * Now that the datasets have been completely synced, we can
627 	 * clean up our in-memory structures accumulated while syncing:
628 	 *
629 	 *  - move dead blocks from the pending deadlist to the on-disk deadlist
630 	 *  - release hold from dsl_dataset_dirty()
631 	 */
632 	while ((ds = list_remove_head(&synced_datasets)) != NULL) {
633 		dsl_dataset_sync_done(ds, tx);
634 	}
635 	while ((dd = txg_list_remove(&dp->dp_dirty_dirs, txg)) != NULL) {
636 		dsl_dir_sync(dd, tx);
637 	}
638 
639 	/*
640 	 * The MOS's space is accounted for in the pool/$MOS
641 	 * (dp_mos_dir).  We can't modify the mos while we're syncing
642 	 * it, so we remember the deltas and apply them here.
643 	 */
644 	if (dp->dp_mos_used_delta != 0 || dp->dp_mos_compressed_delta != 0 ||
645 	    dp->dp_mos_uncompressed_delta != 0) {
646 		dsl_dir_diduse_space(dp->dp_mos_dir, DD_USED_HEAD,
647 		    dp->dp_mos_used_delta,
648 		    dp->dp_mos_compressed_delta,
649 		    dp->dp_mos_uncompressed_delta, tx);
650 		dp->dp_mos_used_delta = 0;
651 		dp->dp_mos_compressed_delta = 0;
652 		dp->dp_mos_uncompressed_delta = 0;
653 	}
654 
655 	if (!multilist_is_empty(mos->os_dirty_dnodes[txg & TXG_MASK])) {
656 		dsl_pool_sync_mos(dp, tx);
657 	}
658 
659 	/*
660 	 * If we modify a dataset in the same txg that we want to destroy it,
661 	 * its dsl_dir's dd_dbuf will be dirty, and thus have a hold on it.
662 	 * dsl_dir_destroy_check() will fail if there are unexpected holds.
663 	 * Therefore, we want to sync the MOS (thus syncing the dd_dbuf
664 	 * and clearing the hold on it) before we process the sync_tasks.
665 	 * The MOS data dirtied by the sync_tasks will be synced on the next
666 	 * pass.
667 	 */
668 	if (!txg_list_empty(&dp->dp_sync_tasks, txg)) {
669 		dsl_sync_task_t *dst;
670 		/*
671 		 * No more sync tasks should have been added while we
672 		 * were syncing.
673 		 */
674 		ASSERT3U(spa_sync_pass(dp->dp_spa), ==, 1);
675 		while ((dst = txg_list_remove(&dp->dp_sync_tasks, txg)) != NULL)
676 			dsl_sync_task_sync(dst, tx);
677 	}
678 
679 	dmu_tx_commit(tx);
680 
681 	DTRACE_PROBE2(dsl_pool_sync__done, dsl_pool_t *dp, dp, uint64_t, txg);
682 }
683 
684 void
685 dsl_pool_sync_done(dsl_pool_t *dp, uint64_t txg)
686 {
687 	zilog_t *zilog;
688 
689 	while (zilog = txg_list_head(&dp->dp_dirty_zilogs, txg)) {
690 		dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
691 		/*
692 		 * We don't remove the zilog from the dp_dirty_zilogs
693 		 * list until after we've cleaned it. This ensures that
694 		 * callers of zilog_is_dirty() receive an accurate
695 		 * answer when they are racing with the spa sync thread.
696 		 */
697 		zil_clean(zilog, txg);
698 		(void) txg_list_remove_this(&dp->dp_dirty_zilogs, zilog, txg);
699 		ASSERT(!dmu_objset_is_dirty(zilog->zl_os, txg));
700 		dmu_buf_rele(ds->ds_dbuf, zilog);
701 	}
702 	ASSERT(!dmu_objset_is_dirty(dp->dp_meta_objset, txg));
703 }
704 
705 /*
706  * TRUE if the current thread is the tx_sync_thread or if we
707  * are being called from SPA context during pool initialization.
708  */
709 int
710 dsl_pool_sync_context(dsl_pool_t *dp)
711 {
712 	return (curthread == dp->dp_tx.tx_sync_thread ||
713 	    spa_is_initializing(dp->dp_spa) ||
714 	    taskq_member(dp->dp_sync_taskq, curthread));
715 }
716 
717 uint64_t
718 dsl_pool_adjustedsize(dsl_pool_t *dp, boolean_t netfree)
719 {
720 	uint64_t space, resv;
721 
722 	/*
723 	 * If we're trying to assess whether it's OK to do a free,
724 	 * cut the reservation in half to allow forward progress
725 	 * (e.g. make it possible to rm(1) files from a full pool).
726 	 */
727 	space = spa_get_dspace(dp->dp_spa);
728 	resv = spa_get_slop_space(dp->dp_spa);
729 	if (netfree)
730 		resv >>= 1;
731 
732 	return (space - resv);
733 }
734 
735 boolean_t
736 dsl_pool_need_dirty_delay(dsl_pool_t *dp)
737 {
738 	uint64_t delay_min_bytes =
739 	    zfs_dirty_data_max * zfs_delay_min_dirty_percent / 100;
740 	boolean_t rv;
741 
742 	mutex_enter(&dp->dp_lock);
743 	if (dp->dp_dirty_total > zfs_dirty_data_sync)
744 		txg_kick(dp);
745 	rv = (dp->dp_dirty_total > delay_min_bytes);
746 	mutex_exit(&dp->dp_lock);
747 	return (rv);
748 }
749 
750 void
751 dsl_pool_dirty_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
752 {
753 	if (space > 0) {
754 		mutex_enter(&dp->dp_lock);
755 		dp->dp_dirty_pertxg[tx->tx_txg & TXG_MASK] += space;
756 		dsl_pool_dirty_delta(dp, space);
757 		mutex_exit(&dp->dp_lock);
758 	}
759 }
760 
761 void
762 dsl_pool_undirty_space(dsl_pool_t *dp, int64_t space, uint64_t txg)
763 {
764 	ASSERT3S(space, >=, 0);
765 	if (space == 0)
766 		return;
767 	mutex_enter(&dp->dp_lock);
768 	if (dp->dp_dirty_pertxg[txg & TXG_MASK] < space) {
769 		/* XXX writing something we didn't dirty? */
770 		space = dp->dp_dirty_pertxg[txg & TXG_MASK];
771 	}
772 	ASSERT3U(dp->dp_dirty_pertxg[txg & TXG_MASK], >=, space);
773 	dp->dp_dirty_pertxg[txg & TXG_MASK] -= space;
774 	ASSERT3U(dp->dp_dirty_total, >=, space);
775 	dsl_pool_dirty_delta(dp, -space);
776 	mutex_exit(&dp->dp_lock);
777 }
778 
779 /* ARGSUSED */
780 static int
781 upgrade_clones_cb(dsl_pool_t *dp, dsl_dataset_t *hds, void *arg)
782 {
783 	dmu_tx_t *tx = arg;
784 	dsl_dataset_t *ds, *prev = NULL;
785 	int err;
786 
787 	err = dsl_dataset_hold_obj(dp, hds->ds_object, FTAG, &ds);
788 	if (err)
789 		return (err);
790 
791 	while (dsl_dataset_phys(ds)->ds_prev_snap_obj != 0) {
792 		err = dsl_dataset_hold_obj(dp,
793 		    dsl_dataset_phys(ds)->ds_prev_snap_obj, FTAG, &prev);
794 		if (err) {
795 			dsl_dataset_rele(ds, FTAG);
796 			return (err);
797 		}
798 
799 		if (dsl_dataset_phys(prev)->ds_next_snap_obj != ds->ds_object)
800 			break;
801 		dsl_dataset_rele(ds, FTAG);
802 		ds = prev;
803 		prev = NULL;
804 	}
805 
806 	if (prev == NULL) {
807 		prev = dp->dp_origin_snap;
808 
809 		/*
810 		 * The $ORIGIN can't have any data, or the accounting
811 		 * will be wrong.
812 		 */
813 		rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
814 		ASSERT0(dsl_dataset_phys(prev)->ds_bp.blk_birth);
815 		rrw_exit(&ds->ds_bp_rwlock, FTAG);
816 
817 		/* The origin doesn't get attached to itself */
818 		if (ds->ds_object == prev->ds_object) {
819 			dsl_dataset_rele(ds, FTAG);
820 			return (0);
821 		}
822 
823 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
824 		dsl_dataset_phys(ds)->ds_prev_snap_obj = prev->ds_object;
825 		dsl_dataset_phys(ds)->ds_prev_snap_txg =
826 		    dsl_dataset_phys(prev)->ds_creation_txg;
827 
828 		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
829 		dsl_dir_phys(ds->ds_dir)->dd_origin_obj = prev->ds_object;
830 
831 		dmu_buf_will_dirty(prev->ds_dbuf, tx);
832 		dsl_dataset_phys(prev)->ds_num_children++;
833 
834 		if (dsl_dataset_phys(ds)->ds_next_snap_obj == 0) {
835 			ASSERT(ds->ds_prev == NULL);
836 			VERIFY0(dsl_dataset_hold_obj(dp,
837 			    dsl_dataset_phys(ds)->ds_prev_snap_obj,
838 			    ds, &ds->ds_prev));
839 		}
840 	}
841 
842 	ASSERT3U(dsl_dir_phys(ds->ds_dir)->dd_origin_obj, ==, prev->ds_object);
843 	ASSERT3U(dsl_dataset_phys(ds)->ds_prev_snap_obj, ==, prev->ds_object);
844 
845 	if (dsl_dataset_phys(prev)->ds_next_clones_obj == 0) {
846 		dmu_buf_will_dirty(prev->ds_dbuf, tx);
847 		dsl_dataset_phys(prev)->ds_next_clones_obj =
848 		    zap_create(dp->dp_meta_objset,
849 		    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
850 	}
851 	VERIFY0(zap_add_int(dp->dp_meta_objset,
852 	    dsl_dataset_phys(prev)->ds_next_clones_obj, ds->ds_object, tx));
853 
854 	dsl_dataset_rele(ds, FTAG);
855 	if (prev != dp->dp_origin_snap)
856 		dsl_dataset_rele(prev, FTAG);
857 	return (0);
858 }
859 
860 void
861 dsl_pool_upgrade_clones(dsl_pool_t *dp, dmu_tx_t *tx)
862 {
863 	ASSERT(dmu_tx_is_syncing(tx));
864 	ASSERT(dp->dp_origin_snap != NULL);
865 
866 	VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj, upgrade_clones_cb,
867 	    tx, DS_FIND_CHILDREN | DS_FIND_SERIALIZE));
868 }
869 
870 /* ARGSUSED */
871 static int
872 upgrade_dir_clones_cb(dsl_pool_t *dp, dsl_dataset_t *ds, void *arg)
873 {
874 	dmu_tx_t *tx = arg;
875 	objset_t *mos = dp->dp_meta_objset;
876 
877 	if (dsl_dir_phys(ds->ds_dir)->dd_origin_obj != 0) {
878 		dsl_dataset_t *origin;
879 
880 		VERIFY0(dsl_dataset_hold_obj(dp,
881 		    dsl_dir_phys(ds->ds_dir)->dd_origin_obj, FTAG, &origin));
882 
883 		if (dsl_dir_phys(origin->ds_dir)->dd_clones == 0) {
884 			dmu_buf_will_dirty(origin->ds_dir->dd_dbuf, tx);
885 			dsl_dir_phys(origin->ds_dir)->dd_clones =
886 			    zap_create(mos, DMU_OT_DSL_CLONES, DMU_OT_NONE,
887 			    0, tx);
888 		}
889 
890 		VERIFY0(zap_add_int(dp->dp_meta_objset,
891 		    dsl_dir_phys(origin->ds_dir)->dd_clones,
892 		    ds->ds_object, tx));
893 
894 		dsl_dataset_rele(origin, FTAG);
895 	}
896 	return (0);
897 }
898 
899 void
900 dsl_pool_upgrade_dir_clones(dsl_pool_t *dp, dmu_tx_t *tx)
901 {
902 	ASSERT(dmu_tx_is_syncing(tx));
903 	uint64_t obj;
904 
905 	(void) dsl_dir_create_sync(dp, dp->dp_root_dir, FREE_DIR_NAME, tx);
906 	VERIFY0(dsl_pool_open_special_dir(dp,
907 	    FREE_DIR_NAME, &dp->dp_free_dir));
908 
909 	/*
910 	 * We can't use bpobj_alloc(), because spa_version() still
911 	 * returns the old version, and we need a new-version bpobj with
912 	 * subobj support.  So call dmu_object_alloc() directly.
913 	 */
914 	obj = dmu_object_alloc(dp->dp_meta_objset, DMU_OT_BPOBJ,
915 	    SPA_OLD_MAXBLOCKSIZE, DMU_OT_BPOBJ_HDR, sizeof (bpobj_phys_t), tx);
916 	VERIFY0(zap_add(dp->dp_meta_objset, DMU_POOL_DIRECTORY_OBJECT,
917 	    DMU_POOL_FREE_BPOBJ, sizeof (uint64_t), 1, &obj, tx));
918 	VERIFY0(bpobj_open(&dp->dp_free_bpobj, dp->dp_meta_objset, obj));
919 
920 	VERIFY0(dmu_objset_find_dp(dp, dp->dp_root_dir_obj,
921 	    upgrade_dir_clones_cb, tx, DS_FIND_CHILDREN | DS_FIND_SERIALIZE));
922 }
923 
924 void
925 dsl_pool_create_origin(dsl_pool_t *dp, dmu_tx_t *tx)
926 {
927 	uint64_t dsobj;
928 	dsl_dataset_t *ds;
929 
930 	ASSERT(dmu_tx_is_syncing(tx));
931 	ASSERT(dp->dp_origin_snap == NULL);
932 	ASSERT(rrw_held(&dp->dp_config_rwlock, RW_WRITER));
933 
934 	/* create the origin dir, ds, & snap-ds */
935 	dsobj = dsl_dataset_create_sync(dp->dp_root_dir, ORIGIN_DIR_NAME,
936 	    NULL, 0, kcred, tx);
937 	VERIFY0(dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds));
938 	dsl_dataset_snapshot_sync_impl(ds, ORIGIN_DIR_NAME, tx);
939 	VERIFY0(dsl_dataset_hold_obj(dp, dsl_dataset_phys(ds)->ds_prev_snap_obj,
940 	    dp, &dp->dp_origin_snap));
941 	dsl_dataset_rele(ds, FTAG);
942 }
943 
944 taskq_t *
945 dsl_pool_vnrele_taskq(dsl_pool_t *dp)
946 {
947 	return (dp->dp_vnrele_taskq);
948 }
949 
950 /*
951  * Walk through the pool-wide zap object of temporary snapshot user holds
952  * and release them.
953  */
954 void
955 dsl_pool_clean_tmp_userrefs(dsl_pool_t *dp)
956 {
957 	zap_attribute_t za;
958 	zap_cursor_t zc;
959 	objset_t *mos = dp->dp_meta_objset;
960 	uint64_t zapobj = dp->dp_tmp_userrefs_obj;
961 	nvlist_t *holds;
962 
963 	if (zapobj == 0)
964 		return;
965 	ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
966 
967 	holds = fnvlist_alloc();
968 
969 	for (zap_cursor_init(&zc, mos, zapobj);
970 	    zap_cursor_retrieve(&zc, &za) == 0;
971 	    zap_cursor_advance(&zc)) {
972 		char *htag;
973 		nvlist_t *tags;
974 
975 		htag = strchr(za.za_name, '-');
976 		*htag = '\0';
977 		++htag;
978 		if (nvlist_lookup_nvlist(holds, za.za_name, &tags) != 0) {
979 			tags = fnvlist_alloc();
980 			fnvlist_add_boolean(tags, htag);
981 			fnvlist_add_nvlist(holds, za.za_name, tags);
982 			fnvlist_free(tags);
983 		} else {
984 			fnvlist_add_boolean(tags, htag);
985 		}
986 	}
987 	dsl_dataset_user_release_tmp(dp, holds);
988 	fnvlist_free(holds);
989 	zap_cursor_fini(&zc);
990 }
991 
992 /*
993  * Create the pool-wide zap object for storing temporary snapshot holds.
994  */
995 void
996 dsl_pool_user_hold_create_obj(dsl_pool_t *dp, dmu_tx_t *tx)
997 {
998 	objset_t *mos = dp->dp_meta_objset;
999 
1000 	ASSERT(dp->dp_tmp_userrefs_obj == 0);
1001 	ASSERT(dmu_tx_is_syncing(tx));
1002 
1003 	dp->dp_tmp_userrefs_obj = zap_create_link(mos, DMU_OT_USERREFS,
1004 	    DMU_POOL_DIRECTORY_OBJECT, DMU_POOL_TMP_USERREFS, tx);
1005 }
1006 
1007 static int
1008 dsl_pool_user_hold_rele_impl(dsl_pool_t *dp, uint64_t dsobj,
1009     const char *tag, uint64_t now, dmu_tx_t *tx, boolean_t holding)
1010 {
1011 	objset_t *mos = dp->dp_meta_objset;
1012 	uint64_t zapobj = dp->dp_tmp_userrefs_obj;
1013 	char *name;
1014 	int error;
1015 
1016 	ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1017 	ASSERT(dmu_tx_is_syncing(tx));
1018 
1019 	/*
1020 	 * If the pool was created prior to SPA_VERSION_USERREFS, the
1021 	 * zap object for temporary holds might not exist yet.
1022 	 */
1023 	if (zapobj == 0) {
1024 		if (holding) {
1025 			dsl_pool_user_hold_create_obj(dp, tx);
1026 			zapobj = dp->dp_tmp_userrefs_obj;
1027 		} else {
1028 			return (SET_ERROR(ENOENT));
1029 		}
1030 	}
1031 
1032 	name = kmem_asprintf("%llx-%s", (u_longlong_t)dsobj, tag);
1033 	if (holding)
1034 		error = zap_add(mos, zapobj, name, 8, 1, &now, tx);
1035 	else
1036 		error = zap_remove(mos, zapobj, name, tx);
1037 	strfree(name);
1038 
1039 	return (error);
1040 }
1041 
1042 /*
1043  * Add a temporary hold for the given dataset object and tag.
1044  */
1045 int
1046 dsl_pool_user_hold(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
1047     uint64_t now, dmu_tx_t *tx)
1048 {
1049 	return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, now, tx, B_TRUE));
1050 }
1051 
1052 /*
1053  * Release a temporary hold for the given dataset object and tag.
1054  */
1055 int
1056 dsl_pool_user_release(dsl_pool_t *dp, uint64_t dsobj, const char *tag,
1057     dmu_tx_t *tx)
1058 {
1059 	return (dsl_pool_user_hold_rele_impl(dp, dsobj, tag, NULL,
1060 	    tx, B_FALSE));
1061 }
1062 
1063 /*
1064  * DSL Pool Configuration Lock
1065  *
1066  * The dp_config_rwlock protects against changes to DSL state (e.g. dataset
1067  * creation / destruction / rename / property setting).  It must be held for
1068  * read to hold a dataset or dsl_dir.  I.e. you must call
1069  * dsl_pool_config_enter() or dsl_pool_hold() before calling
1070  * dsl_{dataset,dir}_hold{_obj}.  In most circumstances, the dp_config_rwlock
1071  * must be held continuously until all datasets and dsl_dirs are released.
1072  *
1073  * The only exception to this rule is that if a "long hold" is placed on
1074  * a dataset, then the dp_config_rwlock may be dropped while the dataset
1075  * is still held.  The long hold will prevent the dataset from being
1076  * destroyed -- the destroy will fail with EBUSY.  A long hold can be
1077  * obtained by calling dsl_dataset_long_hold(), or by "owning" a dataset
1078  * (by calling dsl_{dataset,objset}_{try}own{_obj}).
1079  *
1080  * Legitimate long-holders (including owners) should be long-running, cancelable
1081  * tasks that should cause "zfs destroy" to fail.  This includes DMU
1082  * consumers (i.e. a ZPL filesystem being mounted or ZVOL being open),
1083  * "zfs send", and "zfs diff".  There are several other long-holders whose
1084  * uses are suboptimal (e.g. "zfs promote", and zil_suspend()).
1085  *
1086  * The usual formula for long-holding would be:
1087  * dsl_pool_hold()
1088  * dsl_dataset_hold()
1089  * ... perform checks ...
1090  * dsl_dataset_long_hold()
1091  * dsl_pool_rele()
1092  * ... perform long-running task ...
1093  * dsl_dataset_long_rele()
1094  * dsl_dataset_rele()
1095  *
1096  * Note that when the long hold is released, the dataset is still held but
1097  * the pool is not held.  The dataset may change arbitrarily during this time
1098  * (e.g. it could be destroyed).  Therefore you shouldn't do anything to the
1099  * dataset except release it.
1100  *
1101  * User-initiated operations (e.g. ioctls, zfs_ioc_*()) are either read-only
1102  * or modifying operations.
1103  *
1104  * Modifying operations should generally use dsl_sync_task().  The synctask
1105  * infrastructure enforces proper locking strategy with respect to the
1106  * dp_config_rwlock.  See the comment above dsl_sync_task() for details.
1107  *
1108  * Read-only operations will manually hold the pool, then the dataset, obtain
1109  * information from the dataset, then release the pool and dataset.
1110  * dmu_objset_{hold,rele}() are convenience routines that also do the pool
1111  * hold/rele.
1112  */
1113 
1114 int
1115 dsl_pool_hold(const char *name, void *tag, dsl_pool_t **dp)
1116 {
1117 	spa_t *spa;
1118 	int error;
1119 
1120 	error = spa_open(name, &spa, tag);
1121 	if (error == 0) {
1122 		*dp = spa_get_dsl(spa);
1123 		dsl_pool_config_enter(*dp, tag);
1124 	}
1125 	return (error);
1126 }
1127 
1128 void
1129 dsl_pool_rele(dsl_pool_t *dp, void *tag)
1130 {
1131 	dsl_pool_config_exit(dp, tag);
1132 	spa_close(dp->dp_spa, tag);
1133 }
1134 
1135 void
1136 dsl_pool_config_enter(dsl_pool_t *dp, void *tag)
1137 {
1138 	/*
1139 	 * We use a "reentrant" reader-writer lock, but not reentrantly.
1140 	 *
1141 	 * The rrwlock can (with the track_all flag) track all reading threads,
1142 	 * which is very useful for debugging which code path failed to release
1143 	 * the lock, and for verifying that the *current* thread does hold
1144 	 * the lock.
1145 	 *
1146 	 * (Unlike a rwlock, which knows that N threads hold it for
1147 	 * read, but not *which* threads, so rw_held(RW_READER) returns TRUE
1148 	 * if any thread holds it for read, even if this thread doesn't).
1149 	 */
1150 	ASSERT(!rrw_held(&dp->dp_config_rwlock, RW_READER));
1151 	rrw_enter(&dp->dp_config_rwlock, RW_READER, tag);
1152 }
1153 
1154 void
1155 dsl_pool_config_enter_prio(dsl_pool_t *dp, void *tag)
1156 {
1157 	ASSERT(!rrw_held(&dp->dp_config_rwlock, RW_READER));
1158 	rrw_enter_read_prio(&dp->dp_config_rwlock, tag);
1159 }
1160 
1161 void
1162 dsl_pool_config_exit(dsl_pool_t *dp, void *tag)
1163 {
1164 	rrw_exit(&dp->dp_config_rwlock, tag);
1165 }
1166 
1167 boolean_t
1168 dsl_pool_config_held(dsl_pool_t *dp)
1169 {
1170 	return (RRW_LOCK_HELD(&dp->dp_config_rwlock));
1171 }
1172 
1173 boolean_t
1174 dsl_pool_config_held_writer(dsl_pool_t *dp)
1175 {
1176 	return (RRW_WRITE_HELD(&dp->dp_config_rwlock));
1177 }
1178