xref: /illumos-gate/usr/src/uts/common/fs/zfs/dsl_dataset.c (revision b24ab6762772a3f6a89393947930c7fa61306783)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/dmu_objset.h>
27 #include <sys/dsl_dataset.h>
28 #include <sys/dsl_dir.h>
29 #include <sys/dsl_prop.h>
30 #include <sys/dsl_synctask.h>
31 #include <sys/dmu_traverse.h>
32 #include <sys/dmu_tx.h>
33 #include <sys/arc.h>
34 #include <sys/zio.h>
35 #include <sys/zap.h>
36 #include <sys/unique.h>
37 #include <sys/zfs_context.h>
38 #include <sys/zfs_ioctl.h>
39 #include <sys/spa.h>
40 #include <sys/zfs_znode.h>
41 #include <sys/sunddi.h>
42 #include <sys/zvol.h>
43 
44 static char *dsl_reaper = "the grim reaper";
45 
46 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
47 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
48 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
49 
50 #define	DS_REF_MAX	(1ULL << 62)
51 
52 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
53 
54 #define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
55 
56 
57 /*
58  * Figure out how much of this delta should be propogated to the dsl_dir
59  * layer.  If there's a refreservation, that space has already been
60  * partially accounted for in our ancestors.
61  */
62 static int64_t
63 parent_delta(dsl_dataset_t *ds, int64_t delta)
64 {
65 	uint64_t old_bytes, new_bytes;
66 
67 	if (ds->ds_reserved == 0)
68 		return (delta);
69 
70 	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
71 	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
72 
73 	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
74 	return (new_bytes - old_bytes);
75 }
76 
77 void
78 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
79 {
80 	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
81 	int compressed = BP_GET_PSIZE(bp);
82 	int uncompressed = BP_GET_UCSIZE(bp);
83 	int64_t delta;
84 
85 	dprintf_bp(bp, "born, ds=%p\n", ds);
86 
87 	ASSERT(dmu_tx_is_syncing(tx));
88 	/* It could have been compressed away to nothing */
89 	if (BP_IS_HOLE(bp))
90 		return;
91 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
92 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
93 	if (ds == NULL) {
94 		/*
95 		 * Account for the meta-objset space in its placeholder
96 		 * dsl_dir.
97 		 */
98 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
99 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
100 		    used, compressed, uncompressed, tx);
101 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
102 		return;
103 	}
104 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
105 	mutex_enter(&ds->ds_dir->dd_lock);
106 	mutex_enter(&ds->ds_lock);
107 	delta = parent_delta(ds, used);
108 	ds->ds_phys->ds_used_bytes += used;
109 	ds->ds_phys->ds_compressed_bytes += compressed;
110 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
111 	ds->ds_phys->ds_unique_bytes += used;
112 	mutex_exit(&ds->ds_lock);
113 	dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
114 	    compressed, uncompressed, tx);
115 	dsl_dir_transfer_space(ds->ds_dir, used - delta,
116 	    DD_USED_REFRSRV, DD_USED_HEAD, tx);
117 	mutex_exit(&ds->ds_dir->dd_lock);
118 }
119 
120 int
121 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
122     boolean_t async)
123 {
124 	if (BP_IS_HOLE(bp))
125 		return (0);
126 
127 	ASSERT(dmu_tx_is_syncing(tx));
128 	ASSERT(bp->blk_birth <= tx->tx_txg);
129 
130 	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
131 	int compressed = BP_GET_PSIZE(bp);
132 	int uncompressed = BP_GET_UCSIZE(bp);
133 
134 	ASSERT(used > 0);
135 	if (ds == NULL) {
136 		/*
137 		 * Account for the meta-objset space in its placeholder
138 		 * dataset.
139 		 */
140 		dsl_free(tx->tx_pool, tx->tx_txg, bp);
141 
142 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
143 		    -used, -compressed, -uncompressed, tx);
144 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
145 		return (used);
146 	}
147 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
148 
149 	ASSERT(!dsl_dataset_is_snapshot(ds));
150 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
151 
152 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
153 		int64_t delta;
154 
155 		dprintf_bp(bp, "freeing: %s", "");
156 		dsl_free(tx->tx_pool, tx->tx_txg, bp);
157 
158 		mutex_enter(&ds->ds_dir->dd_lock);
159 		mutex_enter(&ds->ds_lock);
160 		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
161 		    !DS_UNIQUE_IS_ACCURATE(ds));
162 		delta = parent_delta(ds, -used);
163 		ds->ds_phys->ds_unique_bytes -= used;
164 		mutex_exit(&ds->ds_lock);
165 		dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
166 		    delta, -compressed, -uncompressed, tx);
167 		dsl_dir_transfer_space(ds->ds_dir, -used - delta,
168 		    DD_USED_REFRSRV, DD_USED_HEAD, tx);
169 		mutex_exit(&ds->ds_dir->dd_lock);
170 	} else {
171 		dprintf_bp(bp, "putting on dead list: %s", "");
172 		if (async) {
173 			/*
174 			 * We are here as part of zio's write done callback,
175 			 * which means we're a zio interrupt thread.  We can't
176 			 * call bplist_enqueue() now because it may block
177 			 * waiting for I/O.  Instead, put bp on the deferred
178 			 * queue and let dsl_pool_sync() finish the job.
179 			 */
180 			bplist_enqueue_deferred(&ds->ds_deadlist, bp);
181 		} else {
182 			VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
183 		}
184 		ASSERT3U(ds->ds_prev->ds_object, ==,
185 		    ds->ds_phys->ds_prev_snap_obj);
186 		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
187 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
188 		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
189 		    ds->ds_object && bp->blk_birth >
190 		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
191 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
192 			mutex_enter(&ds->ds_prev->ds_lock);
193 			ds->ds_prev->ds_phys->ds_unique_bytes += used;
194 			mutex_exit(&ds->ds_prev->ds_lock);
195 		}
196 		if (bp->blk_birth > ds->ds_origin_txg) {
197 			dsl_dir_transfer_space(ds->ds_dir, used,
198 			    DD_USED_HEAD, DD_USED_SNAP, tx);
199 		}
200 	}
201 	mutex_enter(&ds->ds_lock);
202 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
203 	ds->ds_phys->ds_used_bytes -= used;
204 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
205 	ds->ds_phys->ds_compressed_bytes -= compressed;
206 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
207 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
208 	mutex_exit(&ds->ds_lock);
209 
210 	return (used);
211 }
212 
213 uint64_t
214 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
215 {
216 	uint64_t trysnap = 0;
217 
218 	if (ds == NULL)
219 		return (0);
220 	/*
221 	 * The snapshot creation could fail, but that would cause an
222 	 * incorrect FALSE return, which would only result in an
223 	 * overestimation of the amount of space that an operation would
224 	 * consume, which is OK.
225 	 *
226 	 * There's also a small window where we could miss a pending
227 	 * snapshot, because we could set the sync task in the quiescing
228 	 * phase.  So this should only be used as a guess.
229 	 */
230 	if (ds->ds_trysnap_txg >
231 	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
232 		trysnap = ds->ds_trysnap_txg;
233 	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
234 }
235 
236 boolean_t
237 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
238 {
239 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
240 }
241 
242 /* ARGSUSED */
243 static void
244 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
245 {
246 	dsl_dataset_t *ds = dsv;
247 
248 	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
249 
250 	unique_remove(ds->ds_fsid_guid);
251 
252 	if (ds->ds_objset != NULL)
253 		dmu_objset_evict(ds->ds_objset);
254 
255 	if (ds->ds_prev) {
256 		dsl_dataset_drop_ref(ds->ds_prev, ds);
257 		ds->ds_prev = NULL;
258 	}
259 
260 	bplist_close(&ds->ds_deadlist);
261 	if (ds->ds_dir)
262 		dsl_dir_close(ds->ds_dir, ds);
263 
264 	ASSERT(!list_link_active(&ds->ds_synced_link));
265 
266 	mutex_destroy(&ds->ds_lock);
267 	mutex_destroy(&ds->ds_recvlock);
268 	mutex_destroy(&ds->ds_opening_lock);
269 	rw_destroy(&ds->ds_rwlock);
270 	cv_destroy(&ds->ds_exclusive_cv);
271 	bplist_fini(&ds->ds_deadlist);
272 
273 	kmem_free(ds, sizeof (dsl_dataset_t));
274 }
275 
276 static int
277 dsl_dataset_get_snapname(dsl_dataset_t *ds)
278 {
279 	dsl_dataset_phys_t *headphys;
280 	int err;
281 	dmu_buf_t *headdbuf;
282 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
283 	objset_t *mos = dp->dp_meta_objset;
284 
285 	if (ds->ds_snapname[0])
286 		return (0);
287 	if (ds->ds_phys->ds_next_snap_obj == 0)
288 		return (0);
289 
290 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
291 	    FTAG, &headdbuf);
292 	if (err)
293 		return (err);
294 	headphys = headdbuf->db_data;
295 	err = zap_value_search(dp->dp_meta_objset,
296 	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
297 	dmu_buf_rele(headdbuf, FTAG);
298 	return (err);
299 }
300 
301 static int
302 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
303 {
304 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
305 	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
306 	matchtype_t mt;
307 	int err;
308 
309 	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
310 		mt = MT_FIRST;
311 	else
312 		mt = MT_EXACT;
313 
314 	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
315 	    value, mt, NULL, 0, NULL);
316 	if (err == ENOTSUP && mt == MT_FIRST)
317 		err = zap_lookup(mos, snapobj, name, 8, 1, value);
318 	return (err);
319 }
320 
321 static int
322 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
323 {
324 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
325 	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
326 	matchtype_t mt;
327 	int err;
328 
329 	dsl_dir_snap_cmtime_update(ds->ds_dir);
330 
331 	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
332 		mt = MT_FIRST;
333 	else
334 		mt = MT_EXACT;
335 
336 	err = zap_remove_norm(mos, snapobj, name, mt, tx);
337 	if (err == ENOTSUP && mt == MT_FIRST)
338 		err = zap_remove(mos, snapobj, name, tx);
339 	return (err);
340 }
341 
342 static int
343 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
344     dsl_dataset_t **dsp)
345 {
346 	objset_t *mos = dp->dp_meta_objset;
347 	dmu_buf_t *dbuf;
348 	dsl_dataset_t *ds;
349 	int err;
350 
351 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
352 	    dsl_pool_sync_context(dp));
353 
354 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
355 	if (err)
356 		return (err);
357 	ds = dmu_buf_get_user(dbuf);
358 	if (ds == NULL) {
359 		dsl_dataset_t *winner;
360 
361 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
362 		ds->ds_dbuf = dbuf;
363 		ds->ds_object = dsobj;
364 		ds->ds_phys = dbuf->db_data;
365 
366 		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
367 		mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
368 		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
369 		rw_init(&ds->ds_rwlock, 0, 0, 0);
370 		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
371 		bplist_init(&ds->ds_deadlist);
372 
373 		err = bplist_open(&ds->ds_deadlist,
374 		    mos, ds->ds_phys->ds_deadlist_obj);
375 		if (err == 0) {
376 			err = dsl_dir_open_obj(dp,
377 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
378 		}
379 		if (err) {
380 			/*
381 			 * we don't really need to close the blist if we
382 			 * just opened it.
383 			 */
384 			mutex_destroy(&ds->ds_lock);
385 			mutex_destroy(&ds->ds_recvlock);
386 			mutex_destroy(&ds->ds_opening_lock);
387 			rw_destroy(&ds->ds_rwlock);
388 			cv_destroy(&ds->ds_exclusive_cv);
389 			bplist_fini(&ds->ds_deadlist);
390 			kmem_free(ds, sizeof (dsl_dataset_t));
391 			dmu_buf_rele(dbuf, tag);
392 			return (err);
393 		}
394 
395 		if (!dsl_dataset_is_snapshot(ds)) {
396 			ds->ds_snapname[0] = '\0';
397 			if (ds->ds_phys->ds_prev_snap_obj) {
398 				err = dsl_dataset_get_ref(dp,
399 				    ds->ds_phys->ds_prev_snap_obj,
400 				    ds, &ds->ds_prev);
401 			}
402 
403 			if (err == 0 && dsl_dir_is_clone(ds->ds_dir)) {
404 				dsl_dataset_t *origin;
405 
406 				err = dsl_dataset_hold_obj(dp,
407 				    ds->ds_dir->dd_phys->dd_origin_obj,
408 				    FTAG, &origin);
409 				if (err == 0) {
410 					ds->ds_origin_txg =
411 					    origin->ds_phys->ds_creation_txg;
412 					dsl_dataset_rele(origin, FTAG);
413 				}
414 			}
415 		} else {
416 			if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
417 				err = dsl_dataset_get_snapname(ds);
418 			if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
419 				err = zap_count(
420 				    ds->ds_dir->dd_pool->dp_meta_objset,
421 				    ds->ds_phys->ds_userrefs_obj,
422 				    &ds->ds_userrefs);
423 			}
424 		}
425 
426 		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
427 			/*
428 			 * In sync context, we're called with either no lock
429 			 * or with the write lock.  If we're not syncing,
430 			 * we're always called with the read lock held.
431 			 */
432 			boolean_t need_lock =
433 			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
434 			    dsl_pool_sync_context(dp);
435 
436 			if (need_lock)
437 				rw_enter(&dp->dp_config_rwlock, RW_READER);
438 
439 			err = dsl_prop_get_ds(ds,
440 			    "refreservation", sizeof (uint64_t), 1,
441 			    &ds->ds_reserved, NULL);
442 			if (err == 0) {
443 				err = dsl_prop_get_ds(ds,
444 				    "refquota", sizeof (uint64_t), 1,
445 				    &ds->ds_quota, NULL);
446 			}
447 
448 			if (need_lock)
449 				rw_exit(&dp->dp_config_rwlock);
450 		} else {
451 			ds->ds_reserved = ds->ds_quota = 0;
452 		}
453 
454 		if (err == 0) {
455 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
456 			    dsl_dataset_evict);
457 		}
458 		if (err || winner) {
459 			bplist_close(&ds->ds_deadlist);
460 			if (ds->ds_prev)
461 				dsl_dataset_drop_ref(ds->ds_prev, ds);
462 			dsl_dir_close(ds->ds_dir, ds);
463 			mutex_destroy(&ds->ds_lock);
464 			mutex_destroy(&ds->ds_recvlock);
465 			mutex_destroy(&ds->ds_opening_lock);
466 			rw_destroy(&ds->ds_rwlock);
467 			cv_destroy(&ds->ds_exclusive_cv);
468 			bplist_fini(&ds->ds_deadlist);
469 			kmem_free(ds, sizeof (dsl_dataset_t));
470 			if (err) {
471 				dmu_buf_rele(dbuf, tag);
472 				return (err);
473 			}
474 			ds = winner;
475 		} else {
476 			ds->ds_fsid_guid =
477 			    unique_insert(ds->ds_phys->ds_fsid_guid);
478 		}
479 	}
480 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
481 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
482 	ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
483 	    spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
484 	    dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
485 	mutex_enter(&ds->ds_lock);
486 	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
487 		mutex_exit(&ds->ds_lock);
488 		dmu_buf_rele(ds->ds_dbuf, tag);
489 		return (ENOENT);
490 	}
491 	mutex_exit(&ds->ds_lock);
492 	*dsp = ds;
493 	return (0);
494 }
495 
496 static int
497 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
498 {
499 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
500 
501 	/*
502 	 * In syncing context we don't want the rwlock lock: there
503 	 * may be an existing writer waiting for sync phase to
504 	 * finish.  We don't need to worry about such writers, since
505 	 * sync phase is single-threaded, so the writer can't be
506 	 * doing anything while we are active.
507 	 */
508 	if (dsl_pool_sync_context(dp)) {
509 		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
510 		return (0);
511 	}
512 
513 	/*
514 	 * Normal users will hold the ds_rwlock as a READER until they
515 	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
516 	 * drop their READER lock after they set the ds_owner field.
517 	 *
518 	 * If the dataset is being destroyed, the destroy thread will
519 	 * obtain a WRITER lock for exclusive access after it's done its
520 	 * open-context work and then change the ds_owner to
521 	 * dsl_reaper once destruction is assured.  So threads
522 	 * may block here temporarily, until the "destructability" of
523 	 * the dataset is determined.
524 	 */
525 	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
526 	mutex_enter(&ds->ds_lock);
527 	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
528 		rw_exit(&dp->dp_config_rwlock);
529 		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
530 		if (DSL_DATASET_IS_DESTROYED(ds)) {
531 			mutex_exit(&ds->ds_lock);
532 			dsl_dataset_drop_ref(ds, tag);
533 			rw_enter(&dp->dp_config_rwlock, RW_READER);
534 			return (ENOENT);
535 		}
536 		rw_enter(&dp->dp_config_rwlock, RW_READER);
537 	}
538 	mutex_exit(&ds->ds_lock);
539 	return (0);
540 }
541 
542 int
543 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
544     dsl_dataset_t **dsp)
545 {
546 	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
547 
548 	if (err)
549 		return (err);
550 	return (dsl_dataset_hold_ref(*dsp, tag));
551 }
552 
553 int
554 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
555     void *tag, dsl_dataset_t **dsp)
556 {
557 	int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
558 	if (err)
559 		return (err);
560 	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
561 		dsl_dataset_rele(*dsp, tag);
562 		*dsp = NULL;
563 		return (EBUSY);
564 	}
565 	return (0);
566 }
567 
568 int
569 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
570 {
571 	dsl_dir_t *dd;
572 	dsl_pool_t *dp;
573 	const char *snapname;
574 	uint64_t obj;
575 	int err = 0;
576 
577 	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
578 	if (err)
579 		return (err);
580 
581 	dp = dd->dd_pool;
582 	obj = dd->dd_phys->dd_head_dataset_obj;
583 	rw_enter(&dp->dp_config_rwlock, RW_READER);
584 	if (obj)
585 		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
586 	else
587 		err = ENOENT;
588 	if (err)
589 		goto out;
590 
591 	err = dsl_dataset_hold_ref(*dsp, tag);
592 
593 	/* we may be looking for a snapshot */
594 	if (err == 0 && snapname != NULL) {
595 		dsl_dataset_t *ds = NULL;
596 
597 		if (*snapname++ != '@') {
598 			dsl_dataset_rele(*dsp, tag);
599 			err = ENOENT;
600 			goto out;
601 		}
602 
603 		dprintf("looking for snapshot '%s'\n", snapname);
604 		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
605 		if (err == 0)
606 			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
607 		dsl_dataset_rele(*dsp, tag);
608 
609 		ASSERT3U((err == 0), ==, (ds != NULL));
610 
611 		if (ds) {
612 			mutex_enter(&ds->ds_lock);
613 			if (ds->ds_snapname[0] == 0)
614 				(void) strlcpy(ds->ds_snapname, snapname,
615 				    sizeof (ds->ds_snapname));
616 			mutex_exit(&ds->ds_lock);
617 			err = dsl_dataset_hold_ref(ds, tag);
618 			*dsp = err ? NULL : ds;
619 		}
620 	}
621 out:
622 	rw_exit(&dp->dp_config_rwlock);
623 	dsl_dir_close(dd, FTAG);
624 	return (err);
625 }
626 
627 int
628 dsl_dataset_own(const char *name, boolean_t inconsistentok,
629     void *tag, dsl_dataset_t **dsp)
630 {
631 	int err = dsl_dataset_hold(name, tag, dsp);
632 	if (err)
633 		return (err);
634 	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
635 		dsl_dataset_rele(*dsp, tag);
636 		return (EBUSY);
637 	}
638 	return (0);
639 }
640 
641 void
642 dsl_dataset_name(dsl_dataset_t *ds, char *name)
643 {
644 	if (ds == NULL) {
645 		(void) strcpy(name, "mos");
646 	} else {
647 		dsl_dir_name(ds->ds_dir, name);
648 		VERIFY(0 == dsl_dataset_get_snapname(ds));
649 		if (ds->ds_snapname[0]) {
650 			(void) strcat(name, "@");
651 			/*
652 			 * We use a "recursive" mutex so that we
653 			 * can call dprintf_ds() with ds_lock held.
654 			 */
655 			if (!MUTEX_HELD(&ds->ds_lock)) {
656 				mutex_enter(&ds->ds_lock);
657 				(void) strcat(name, ds->ds_snapname);
658 				mutex_exit(&ds->ds_lock);
659 			} else {
660 				(void) strcat(name, ds->ds_snapname);
661 			}
662 		}
663 	}
664 }
665 
666 static int
667 dsl_dataset_namelen(dsl_dataset_t *ds)
668 {
669 	int result;
670 
671 	if (ds == NULL) {
672 		result = 3;	/* "mos" */
673 	} else {
674 		result = dsl_dir_namelen(ds->ds_dir);
675 		VERIFY(0 == dsl_dataset_get_snapname(ds));
676 		if (ds->ds_snapname[0]) {
677 			++result;	/* adding one for the @-sign */
678 			if (!MUTEX_HELD(&ds->ds_lock)) {
679 				mutex_enter(&ds->ds_lock);
680 				result += strlen(ds->ds_snapname);
681 				mutex_exit(&ds->ds_lock);
682 			} else {
683 				result += strlen(ds->ds_snapname);
684 			}
685 		}
686 	}
687 
688 	return (result);
689 }
690 
691 void
692 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
693 {
694 	dmu_buf_rele(ds->ds_dbuf, tag);
695 }
696 
697 void
698 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
699 {
700 	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
701 		rw_exit(&ds->ds_rwlock);
702 	}
703 	dsl_dataset_drop_ref(ds, tag);
704 }
705 
706 void
707 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
708 {
709 	ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
710 	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
711 
712 	mutex_enter(&ds->ds_lock);
713 	ds->ds_owner = NULL;
714 	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
715 		rw_exit(&ds->ds_rwlock);
716 		cv_broadcast(&ds->ds_exclusive_cv);
717 	}
718 	mutex_exit(&ds->ds_lock);
719 	if (ds->ds_dbuf)
720 		dsl_dataset_drop_ref(ds, tag);
721 	else
722 		dsl_dataset_evict(ds->ds_dbuf, ds);
723 }
724 
725 boolean_t
726 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
727 {
728 	boolean_t gotit = FALSE;
729 
730 	mutex_enter(&ds->ds_lock);
731 	if (ds->ds_owner == NULL &&
732 	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
733 		ds->ds_owner = tag;
734 		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
735 			rw_exit(&ds->ds_rwlock);
736 		gotit = TRUE;
737 	}
738 	mutex_exit(&ds->ds_lock);
739 	return (gotit);
740 }
741 
742 void
743 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
744 {
745 	ASSERT3P(owner, ==, ds->ds_owner);
746 	if (!RW_WRITE_HELD(&ds->ds_rwlock))
747 		rw_enter(&ds->ds_rwlock, RW_WRITER);
748 }
749 
750 uint64_t
751 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
752     uint64_t flags, dmu_tx_t *tx)
753 {
754 	dsl_pool_t *dp = dd->dd_pool;
755 	dmu_buf_t *dbuf;
756 	dsl_dataset_phys_t *dsphys;
757 	uint64_t dsobj;
758 	objset_t *mos = dp->dp_meta_objset;
759 
760 	if (origin == NULL)
761 		origin = dp->dp_origin_snap;
762 
763 	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
764 	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
765 	ASSERT(dmu_tx_is_syncing(tx));
766 	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
767 
768 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
769 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
770 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
771 	dmu_buf_will_dirty(dbuf, tx);
772 	dsphys = dbuf->db_data;
773 	bzero(dsphys, sizeof (dsl_dataset_phys_t));
774 	dsphys->ds_dir_obj = dd->dd_object;
775 	dsphys->ds_flags = flags;
776 	dsphys->ds_fsid_guid = unique_create();
777 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
778 	    sizeof (dsphys->ds_guid));
779 	dsphys->ds_snapnames_zapobj =
780 	    zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
781 	    DMU_OT_NONE, 0, tx);
782 	dsphys->ds_creation_time = gethrestime_sec();
783 	dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
784 	dsphys->ds_deadlist_obj =
785 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
786 
787 	if (origin) {
788 		dsphys->ds_prev_snap_obj = origin->ds_object;
789 		dsphys->ds_prev_snap_txg =
790 		    origin->ds_phys->ds_creation_txg;
791 		dsphys->ds_used_bytes =
792 		    origin->ds_phys->ds_used_bytes;
793 		dsphys->ds_compressed_bytes =
794 		    origin->ds_phys->ds_compressed_bytes;
795 		dsphys->ds_uncompressed_bytes =
796 		    origin->ds_phys->ds_uncompressed_bytes;
797 		dsphys->ds_bp = origin->ds_phys->ds_bp;
798 		dsphys->ds_flags |= origin->ds_phys->ds_flags;
799 
800 		dmu_buf_will_dirty(origin->ds_dbuf, tx);
801 		origin->ds_phys->ds_num_children++;
802 
803 		if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
804 			if (origin->ds_phys->ds_next_clones_obj == 0) {
805 				origin->ds_phys->ds_next_clones_obj =
806 				    zap_create(mos,
807 				    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
808 			}
809 			VERIFY(0 == zap_add_int(mos,
810 			    origin->ds_phys->ds_next_clones_obj,
811 			    dsobj, tx));
812 		}
813 
814 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
815 		dd->dd_phys->dd_origin_obj = origin->ds_object;
816 	}
817 
818 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
819 		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
820 
821 	dmu_buf_rele(dbuf, FTAG);
822 
823 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
824 	dd->dd_phys->dd_head_dataset_obj = dsobj;
825 
826 	return (dsobj);
827 }
828 
829 uint64_t
830 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
831     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
832 {
833 	dsl_pool_t *dp = pdd->dd_pool;
834 	uint64_t dsobj, ddobj;
835 	dsl_dir_t *dd;
836 
837 	ASSERT(lastname[0] != '@');
838 
839 	ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
840 	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
841 
842 	dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
843 
844 	dsl_deleg_set_create_perms(dd, tx, cr);
845 
846 	dsl_dir_close(dd, FTAG);
847 
848 	return (dsobj);
849 }
850 
851 struct destroyarg {
852 	dsl_sync_task_group_t *dstg;
853 	char *snapname;
854 	char *failed;
855 	boolean_t defer;
856 };
857 
858 static int
859 dsl_snapshot_destroy_one(char *name, void *arg)
860 {
861 	struct destroyarg *da = arg;
862 	dsl_dataset_t *ds;
863 	int err;
864 	char *dsname;
865 
866 	dsname = kmem_asprintf("%s@%s", name, da->snapname);
867 	err = dsl_dataset_own(dsname, B_TRUE, da->dstg, &ds);
868 	strfree(dsname);
869 	if (err == 0) {
870 		struct dsl_ds_destroyarg *dsda;
871 
872 		dsl_dataset_make_exclusive(ds, da->dstg);
873 		if (ds->ds_objset != NULL) {
874 			dmu_objset_evict(ds->ds_objset);
875 			ds->ds_objset = NULL;
876 		}
877 		dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg), KM_SLEEP);
878 		dsda->ds = ds;
879 		dsda->defer = da->defer;
880 		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
881 		    dsl_dataset_destroy_sync, dsda, da->dstg, 0);
882 	} else if (err == ENOENT) {
883 		err = 0;
884 	} else {
885 		(void) strcpy(da->failed, name);
886 	}
887 	return (err);
888 }
889 
890 /*
891  * Destroy 'snapname' in all descendants of 'fsname'.
892  */
893 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
894 int
895 dsl_snapshots_destroy(char *fsname, char *snapname, boolean_t defer)
896 {
897 	int err;
898 	struct destroyarg da;
899 	dsl_sync_task_t *dst;
900 	spa_t *spa;
901 
902 	err = spa_open(fsname, &spa, FTAG);
903 	if (err)
904 		return (err);
905 	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
906 	da.snapname = snapname;
907 	da.failed = fsname;
908 	da.defer = defer;
909 
910 	err = dmu_objset_find(fsname,
911 	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
912 
913 	if (err == 0)
914 		err = dsl_sync_task_group_wait(da.dstg);
915 
916 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
917 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
918 		struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
919 		dsl_dataset_t *ds = dsda->ds;
920 
921 		/*
922 		 * Return the file system name that triggered the error
923 		 */
924 		if (dst->dst_err) {
925 			dsl_dataset_name(ds, fsname);
926 			*strchr(fsname, '@') = '\0';
927 		}
928 		ASSERT3P(dsda->rm_origin, ==, NULL);
929 		dsl_dataset_disown(ds, da.dstg);
930 		kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
931 	}
932 
933 	dsl_sync_task_group_destroy(da.dstg);
934 	spa_close(spa, FTAG);
935 	return (err);
936 }
937 
938 static boolean_t
939 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
940 {
941 	boolean_t might_destroy = B_FALSE;
942 
943 	mutex_enter(&ds->ds_lock);
944 	if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
945 	    DS_IS_DEFER_DESTROY(ds))
946 		might_destroy = B_TRUE;
947 	mutex_exit(&ds->ds_lock);
948 
949 	return (might_destroy);
950 }
951 
952 /*
953  * If we're removing a clone, and these three conditions are true:
954  *	1) the clone's origin has no other children
955  *	2) the clone's origin has no user references
956  *	3) the clone's origin has been marked for deferred destruction
957  * Then, prepare to remove the origin as part of this sync task group.
958  */
959 static int
960 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
961 {
962 	dsl_dataset_t *ds = dsda->ds;
963 	dsl_dataset_t *origin = ds->ds_prev;
964 
965 	if (dsl_dataset_might_destroy_origin(origin)) {
966 		char *name;
967 		int namelen;
968 		int error;
969 
970 		namelen = dsl_dataset_namelen(origin) + 1;
971 		name = kmem_alloc(namelen, KM_SLEEP);
972 		dsl_dataset_name(origin, name);
973 #ifdef _KERNEL
974 		error = zfs_unmount_snap(name, NULL);
975 		if (error) {
976 			kmem_free(name, namelen);
977 			return (error);
978 		}
979 #endif
980 		error = dsl_dataset_own(name, B_TRUE, tag, &origin);
981 		kmem_free(name, namelen);
982 		if (error)
983 			return (error);
984 		dsda->rm_origin = origin;
985 		dsl_dataset_make_exclusive(origin, tag);
986 
987 		if (origin->ds_objset != NULL) {
988 			dmu_objset_evict(origin->ds_objset);
989 			origin->ds_objset = NULL;
990 		}
991 	}
992 
993 	return (0);
994 }
995 
996 /*
997  * ds must be opened as OWNER.  On return (whether successful or not),
998  * ds will be closed and caller can no longer dereference it.
999  */
1000 int
1001 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
1002 {
1003 	int err;
1004 	dsl_sync_task_group_t *dstg;
1005 	objset_t *os;
1006 	dsl_dir_t *dd;
1007 	uint64_t obj;
1008 	struct dsl_ds_destroyarg dsda = {0};
1009 
1010 	dsda.ds = ds;
1011 
1012 	if (dsl_dataset_is_snapshot(ds)) {
1013 		/* Destroying a snapshot is simpler */
1014 		dsl_dataset_make_exclusive(ds, tag);
1015 
1016 		if (ds->ds_objset != NULL) {
1017 			dmu_objset_evict(ds->ds_objset);
1018 			ds->ds_objset = NULL;
1019 		}
1020 		dsda.defer = defer;
1021 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1022 		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
1023 		    &dsda, tag, 0);
1024 		ASSERT3P(dsda.rm_origin, ==, NULL);
1025 		goto out;
1026 	} else if (defer) {
1027 		err = EINVAL;
1028 		goto out;
1029 	}
1030 
1031 	dd = ds->ds_dir;
1032 
1033 	/*
1034 	 * Check for errors and mark this ds as inconsistent, in
1035 	 * case we crash while freeing the objects.
1036 	 */
1037 	err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
1038 	    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
1039 	if (err)
1040 		goto out;
1041 
1042 	err = dmu_objset_from_ds(ds, &os);
1043 	if (err)
1044 		goto out;
1045 
1046 	/*
1047 	 * remove the objects in open context, so that we won't
1048 	 * have too much to do in syncing context.
1049 	 */
1050 	for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
1051 	    ds->ds_phys->ds_prev_snap_txg)) {
1052 		/*
1053 		 * Ignore errors, if there is not enough disk space
1054 		 * we will deal with it in dsl_dataset_destroy_sync().
1055 		 */
1056 		(void) dmu_free_object(os, obj);
1057 	}
1058 
1059 	/*
1060 	 * We need to sync out all in-flight IO before we try to evict
1061 	 * (the dataset evict func is trying to clear the cached entries
1062 	 * for this dataset in the ARC).
1063 	 */
1064 	txg_wait_synced(dd->dd_pool, 0);
1065 
1066 	/*
1067 	 * If we managed to free all the objects in open
1068 	 * context, the user space accounting should be zero.
1069 	 */
1070 	if (ds->ds_phys->ds_bp.blk_fill == 0 &&
1071 	    dmu_objset_userused_enabled(os)) {
1072 		uint64_t count;
1073 
1074 		ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
1075 		    count == 0);
1076 		ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
1077 		    count == 0);
1078 	}
1079 
1080 	if (err != ESRCH)
1081 		goto out;
1082 
1083 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
1084 	err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
1085 	rw_exit(&dd->dd_pool->dp_config_rwlock);
1086 
1087 	if (err)
1088 		goto out;
1089 
1090 	if (ds->ds_objset) {
1091 		/*
1092 		 * We need to sync out all in-flight IO before we try
1093 		 * to evict (the dataset evict func is trying to clear
1094 		 * the cached entries for this dataset in the ARC).
1095 		 */
1096 		txg_wait_synced(dd->dd_pool, 0);
1097 	}
1098 
1099 	/*
1100 	 * Blow away the dsl_dir + head dataset.
1101 	 */
1102 	dsl_dataset_make_exclusive(ds, tag);
1103 	if (ds->ds_objset) {
1104 		dmu_objset_evict(ds->ds_objset);
1105 		ds->ds_objset = NULL;
1106 	}
1107 
1108 	/*
1109 	 * If we're removing a clone, we might also need to remove its
1110 	 * origin.
1111 	 */
1112 	do {
1113 		dsda.need_prep = B_FALSE;
1114 		if (dsl_dir_is_clone(dd)) {
1115 			err = dsl_dataset_origin_rm_prep(&dsda, tag);
1116 			if (err) {
1117 				dsl_dir_close(dd, FTAG);
1118 				goto out;
1119 			}
1120 		}
1121 
1122 		dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
1123 		dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
1124 		    dsl_dataset_destroy_sync, &dsda, tag, 0);
1125 		dsl_sync_task_create(dstg, dsl_dir_destroy_check,
1126 		    dsl_dir_destroy_sync, dd, FTAG, 0);
1127 		err = dsl_sync_task_group_wait(dstg);
1128 		dsl_sync_task_group_destroy(dstg);
1129 
1130 		/*
1131 		 * We could be racing against 'zfs release' or 'zfs destroy -d'
1132 		 * on the origin snap, in which case we can get EBUSY if we
1133 		 * needed to destroy the origin snap but were not ready to
1134 		 * do so.
1135 		 */
1136 		if (dsda.need_prep) {
1137 			ASSERT(err == EBUSY);
1138 			ASSERT(dsl_dir_is_clone(dd));
1139 			ASSERT(dsda.rm_origin == NULL);
1140 		}
1141 	} while (dsda.need_prep);
1142 
1143 	if (dsda.rm_origin != NULL)
1144 		dsl_dataset_disown(dsda.rm_origin, tag);
1145 
1146 	/* if it is successful, dsl_dir_destroy_sync will close the dd */
1147 	if (err)
1148 		dsl_dir_close(dd, FTAG);
1149 out:
1150 	dsl_dataset_disown(ds, tag);
1151 	return (err);
1152 }
1153 
1154 blkptr_t *
1155 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
1156 {
1157 	return (&ds->ds_phys->ds_bp);
1158 }
1159 
1160 void
1161 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
1162 {
1163 	ASSERT(dmu_tx_is_syncing(tx));
1164 	/* If it's the meta-objset, set dp_meta_rootbp */
1165 	if (ds == NULL) {
1166 		tx->tx_pool->dp_meta_rootbp = *bp;
1167 	} else {
1168 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1169 		ds->ds_phys->ds_bp = *bp;
1170 	}
1171 }
1172 
1173 spa_t *
1174 dsl_dataset_get_spa(dsl_dataset_t *ds)
1175 {
1176 	return (ds->ds_dir->dd_pool->dp_spa);
1177 }
1178 
1179 void
1180 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
1181 {
1182 	dsl_pool_t *dp;
1183 
1184 	if (ds == NULL) /* this is the meta-objset */
1185 		return;
1186 
1187 	ASSERT(ds->ds_objset != NULL);
1188 
1189 	if (ds->ds_phys->ds_next_snap_obj != 0)
1190 		panic("dirtying snapshot!");
1191 
1192 	dp = ds->ds_dir->dd_pool;
1193 
1194 	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
1195 		/* up the hold count until we can be written out */
1196 		dmu_buf_add_ref(ds->ds_dbuf, ds);
1197 	}
1198 }
1199 
1200 /*
1201  * The unique space in the head dataset can be calculated by subtracting
1202  * the space used in the most recent snapshot, that is still being used
1203  * in this file system, from the space currently in use.  To figure out
1204  * the space in the most recent snapshot still in use, we need to take
1205  * the total space used in the snapshot and subtract out the space that
1206  * has been freed up since the snapshot was taken.
1207  */
1208 static void
1209 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1210 {
1211 	uint64_t mrs_used;
1212 	uint64_t dlused, dlcomp, dluncomp;
1213 
1214 	ASSERT(ds->ds_object == ds->ds_dir->dd_phys->dd_head_dataset_obj);
1215 
1216 	if (ds->ds_phys->ds_prev_snap_obj != 0)
1217 		mrs_used = ds->ds_prev->ds_phys->ds_used_bytes;
1218 	else
1219 		mrs_used = 0;
1220 
1221 	VERIFY(0 == bplist_space(&ds->ds_deadlist, &dlused, &dlcomp,
1222 	    &dluncomp));
1223 
1224 	ASSERT3U(dlused, <=, mrs_used);
1225 	ds->ds_phys->ds_unique_bytes =
1226 	    ds->ds_phys->ds_used_bytes - (mrs_used - dlused);
1227 
1228 	if (!DS_UNIQUE_IS_ACCURATE(ds) &&
1229 	    spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1230 	    SPA_VERSION_UNIQUE_ACCURATE)
1231 		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1232 }
1233 
1234 static uint64_t
1235 dsl_dataset_unique(dsl_dataset_t *ds)
1236 {
1237 	if (!DS_UNIQUE_IS_ACCURATE(ds) && !dsl_dataset_is_snapshot(ds))
1238 		dsl_dataset_recalc_head_uniq(ds);
1239 
1240 	return (ds->ds_phys->ds_unique_bytes);
1241 }
1242 
1243 struct killarg {
1244 	dsl_dataset_t *ds;
1245 	dmu_tx_t *tx;
1246 };
1247 
1248 /* ARGSUSED */
1249 static int
1250 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1251     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
1252 {
1253 	struct killarg *ka = arg;
1254 	dmu_tx_t *tx = ka->tx;
1255 
1256 	if (bp == NULL)
1257 		return (0);
1258 
1259 	if (zb->zb_level == ZB_ZIL_LEVEL) {
1260 		ASSERT(zilog != NULL);
1261 		/*
1262 		 * It's a block in the intent log.  It has no
1263 		 * accounting, so just free it.
1264 		 */
1265 		dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
1266 	} else {
1267 		ASSERT(zilog == NULL);
1268 		ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
1269 		(void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
1270 	}
1271 
1272 	return (0);
1273 }
1274 
1275 /* ARGSUSED */
1276 static int
1277 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1278 {
1279 	dsl_dataset_t *ds = arg1;
1280 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1281 	uint64_t count;
1282 	int err;
1283 
1284 	/*
1285 	 * Can't delete a head dataset if there are snapshots of it.
1286 	 * (Except if the only snapshots are from the branch we cloned
1287 	 * from.)
1288 	 */
1289 	if (ds->ds_prev != NULL &&
1290 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1291 		return (EBUSY);
1292 
1293 	/*
1294 	 * This is really a dsl_dir thing, but check it here so that
1295 	 * we'll be less likely to leave this dataset inconsistent &
1296 	 * nearly destroyed.
1297 	 */
1298 	err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1299 	if (err)
1300 		return (err);
1301 	if (count != 0)
1302 		return (EEXIST);
1303 
1304 	return (0);
1305 }
1306 
1307 /* ARGSUSED */
1308 static void
1309 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
1310 {
1311 	dsl_dataset_t *ds = arg1;
1312 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1313 
1314 	/* Mark it as inconsistent on-disk, in case we crash */
1315 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1316 	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1317 
1318 	spa_history_internal_log(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1319 	    cr, "dataset = %llu", ds->ds_object);
1320 }
1321 
1322 static int
1323 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
1324     dmu_tx_t *tx)
1325 {
1326 	dsl_dataset_t *ds = dsda->ds;
1327 	dsl_dataset_t *ds_prev = ds->ds_prev;
1328 
1329 	if (dsl_dataset_might_destroy_origin(ds_prev)) {
1330 		struct dsl_ds_destroyarg ndsda = {0};
1331 
1332 		/*
1333 		 * If we're not prepared to remove the origin, don't remove
1334 		 * the clone either.
1335 		 */
1336 		if (dsda->rm_origin == NULL) {
1337 			dsda->need_prep = B_TRUE;
1338 			return (EBUSY);
1339 		}
1340 
1341 		ndsda.ds = ds_prev;
1342 		ndsda.is_origin_rm = B_TRUE;
1343 		return (dsl_dataset_destroy_check(&ndsda, tag, tx));
1344 	}
1345 
1346 	/*
1347 	 * If we're not going to remove the origin after all,
1348 	 * undo the open context setup.
1349 	 */
1350 	if (dsda->rm_origin != NULL) {
1351 		dsl_dataset_disown(dsda->rm_origin, tag);
1352 		dsda->rm_origin = NULL;
1353 	}
1354 
1355 	return (0);
1356 }
1357 
1358 /* ARGSUSED */
1359 int
1360 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1361 {
1362 	struct dsl_ds_destroyarg *dsda = arg1;
1363 	dsl_dataset_t *ds = dsda->ds;
1364 
1365 	/* we have an owner hold, so noone else can destroy us */
1366 	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
1367 
1368 	/*
1369 	 * Only allow deferred destroy on pools that support it.
1370 	 * NOTE: deferred destroy is only supported on snapshots.
1371 	 */
1372 	if (dsda->defer) {
1373 		if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
1374 		    SPA_VERSION_USERREFS)
1375 			return (ENOTSUP);
1376 		ASSERT(dsl_dataset_is_snapshot(ds));
1377 		return (0);
1378 	}
1379 
1380 	/*
1381 	 * Can't delete a head dataset if there are snapshots of it.
1382 	 * (Except if the only snapshots are from the branch we cloned
1383 	 * from.)
1384 	 */
1385 	if (ds->ds_prev != NULL &&
1386 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1387 		return (EBUSY);
1388 
1389 	/*
1390 	 * If we made changes this txg, traverse_dsl_dataset won't find
1391 	 * them.  Try again.
1392 	 */
1393 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1394 		return (EAGAIN);
1395 
1396 	if (dsl_dataset_is_snapshot(ds)) {
1397 		/*
1398 		 * If this snapshot has an elevated user reference count,
1399 		 * we can't destroy it yet.
1400 		 */
1401 		if (ds->ds_userrefs > 0 && !dsda->releasing)
1402 			return (EBUSY);
1403 
1404 		mutex_enter(&ds->ds_lock);
1405 		/*
1406 		 * Can't delete a branch point. However, if we're destroying
1407 		 * a clone and removing its origin due to it having a user
1408 		 * hold count of 0 and having been marked for deferred destroy,
1409 		 * it's OK for the origin to have a single clone.
1410 		 */
1411 		if (ds->ds_phys->ds_num_children >
1412 		    (dsda->is_origin_rm ? 2 : 1)) {
1413 			mutex_exit(&ds->ds_lock);
1414 			return (EEXIST);
1415 		}
1416 		mutex_exit(&ds->ds_lock);
1417 	} else if (dsl_dir_is_clone(ds->ds_dir)) {
1418 		return (dsl_dataset_origin_check(dsda, arg2, tx));
1419 	}
1420 
1421 	/* XXX we should do some i/o error checking... */
1422 	return (0);
1423 }
1424 
1425 struct refsarg {
1426 	kmutex_t lock;
1427 	boolean_t gone;
1428 	kcondvar_t cv;
1429 };
1430 
1431 /* ARGSUSED */
1432 static void
1433 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
1434 {
1435 	struct refsarg *arg = argv;
1436 
1437 	mutex_enter(&arg->lock);
1438 	arg->gone = TRUE;
1439 	cv_signal(&arg->cv);
1440 	mutex_exit(&arg->lock);
1441 }
1442 
1443 static void
1444 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
1445 {
1446 	struct refsarg arg;
1447 
1448 	mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
1449 	cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
1450 	arg.gone = FALSE;
1451 	(void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
1452 	    dsl_dataset_refs_gone);
1453 	dmu_buf_rele(ds->ds_dbuf, tag);
1454 	mutex_enter(&arg.lock);
1455 	while (!arg.gone)
1456 		cv_wait(&arg.cv, &arg.lock);
1457 	ASSERT(arg.gone);
1458 	mutex_exit(&arg.lock);
1459 	ds->ds_dbuf = NULL;
1460 	ds->ds_phys = NULL;
1461 	mutex_destroy(&arg.lock);
1462 	cv_destroy(&arg.cv);
1463 }
1464 
1465 static void
1466 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
1467 {
1468 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1469 	uint64_t count;
1470 	int err;
1471 
1472 	ASSERT(ds->ds_phys->ds_num_children >= 2);
1473 	err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
1474 	/*
1475 	 * The err should not be ENOENT, but a bug in a previous version
1476 	 * of the code could cause upgrade_clones_cb() to not set
1477 	 * ds_next_snap_obj when it should, leading to a missing entry.
1478 	 * If we knew that the pool was created after
1479 	 * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
1480 	 * ENOENT.  However, at least we can check that we don't have
1481 	 * too many entries in the next_clones_obj even after failing to
1482 	 * remove this one.
1483 	 */
1484 	if (err != ENOENT) {
1485 		VERIFY3U(err, ==, 0);
1486 	}
1487 	ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
1488 	    &count));
1489 	ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
1490 }
1491 
1492 void
1493 dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
1494 {
1495 	struct dsl_ds_destroyarg *dsda = arg1;
1496 	dsl_dataset_t *ds = dsda->ds;
1497 	int err;
1498 	int after_branch_point = FALSE;
1499 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1500 	objset_t *mos = dp->dp_meta_objset;
1501 	dsl_dataset_t *ds_prev = NULL;
1502 	uint64_t obj;
1503 
1504 	ASSERT(ds->ds_owner);
1505 	ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
1506 	ASSERT(ds->ds_prev == NULL ||
1507 	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1508 	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1509 
1510 	if (dsda->defer) {
1511 		ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
1512 		if (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1) {
1513 			dmu_buf_will_dirty(ds->ds_dbuf, tx);
1514 			ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
1515 			return;
1516 		}
1517 	}
1518 
1519 	/* signal any waiters that this dataset is going away */
1520 	mutex_enter(&ds->ds_lock);
1521 	ds->ds_owner = dsl_reaper;
1522 	cv_broadcast(&ds->ds_exclusive_cv);
1523 	mutex_exit(&ds->ds_lock);
1524 
1525 	/* Remove our reservation */
1526 	if (ds->ds_reserved != 0) {
1527 		uint64_t val = 0;
1528 		dsl_dataset_set_reservation_sync(ds, &val, cr, tx);
1529 		ASSERT3U(ds->ds_reserved, ==, 0);
1530 	}
1531 
1532 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1533 
1534 	dsl_pool_ds_destroyed(ds, tx);
1535 
1536 	obj = ds->ds_object;
1537 
1538 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1539 		if (ds->ds_prev) {
1540 			ds_prev = ds->ds_prev;
1541 		} else {
1542 			VERIFY(0 == dsl_dataset_hold_obj(dp,
1543 			    ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
1544 		}
1545 		after_branch_point =
1546 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1547 
1548 		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1549 		if (after_branch_point &&
1550 		    ds_prev->ds_phys->ds_next_clones_obj != 0) {
1551 			remove_from_next_clones(ds_prev, obj, tx);
1552 			if (ds->ds_phys->ds_next_snap_obj != 0) {
1553 				VERIFY(0 == zap_add_int(mos,
1554 				    ds_prev->ds_phys->ds_next_clones_obj,
1555 				    ds->ds_phys->ds_next_snap_obj, tx));
1556 			}
1557 		}
1558 		if (after_branch_point &&
1559 		    ds->ds_phys->ds_next_snap_obj == 0) {
1560 			/* This clone is toast. */
1561 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1562 			ds_prev->ds_phys->ds_num_children--;
1563 
1564 			/*
1565 			 * If the clone's origin has no other clones, no
1566 			 * user holds, and has been marked for deferred
1567 			 * deletion, then we should have done the necessary
1568 			 * destroy setup for it.
1569 			 */
1570 			if (ds_prev->ds_phys->ds_num_children == 1 &&
1571 			    ds_prev->ds_userrefs == 0 &&
1572 			    DS_IS_DEFER_DESTROY(ds_prev)) {
1573 				ASSERT3P(dsda->rm_origin, !=, NULL);
1574 			} else {
1575 				ASSERT3P(dsda->rm_origin, ==, NULL);
1576 			}
1577 		} else if (!after_branch_point) {
1578 			ds_prev->ds_phys->ds_next_snap_obj =
1579 			    ds->ds_phys->ds_next_snap_obj;
1580 		}
1581 	}
1582 
1583 	if (ds->ds_phys->ds_next_snap_obj != 0) {
1584 		blkptr_t bp;
1585 		dsl_dataset_t *ds_next;
1586 		uint64_t itor = 0;
1587 		uint64_t old_unique;
1588 		int64_t used = 0, compressed = 0, uncompressed = 0;
1589 
1590 		VERIFY(0 == dsl_dataset_hold_obj(dp,
1591 		    ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
1592 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1593 
1594 		old_unique = dsl_dataset_unique(ds_next);
1595 
1596 		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1597 		ds_next->ds_phys->ds_prev_snap_obj =
1598 		    ds->ds_phys->ds_prev_snap_obj;
1599 		ds_next->ds_phys->ds_prev_snap_txg =
1600 		    ds->ds_phys->ds_prev_snap_txg;
1601 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1602 		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1603 
1604 		/*
1605 		 * Transfer to our deadlist (which will become next's
1606 		 * new deadlist) any entries from next's current
1607 		 * deadlist which were born before prev, and free the
1608 		 * other entries.
1609 		 *
1610 		 * XXX we're doing this long task with the config lock held
1611 		 */
1612 		while (bplist_iterate(&ds_next->ds_deadlist, &itor, &bp) == 0) {
1613 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
1614 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
1615 				    &bp, tx));
1616 				if (ds_prev && !after_branch_point &&
1617 				    bp.blk_birth >
1618 				    ds_prev->ds_phys->ds_prev_snap_txg) {
1619 					ds_prev->ds_phys->ds_unique_bytes +=
1620 					    bp_get_dsize_sync(dp->dp_spa, &bp);
1621 				}
1622 			} else {
1623 				used += bp_get_dsize_sync(dp->dp_spa, &bp);
1624 				compressed += BP_GET_PSIZE(&bp);
1625 				uncompressed += BP_GET_UCSIZE(&bp);
1626 				dsl_free(dp, tx->tx_txg, &bp);
1627 			}
1628 		}
1629 
1630 		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
1631 
1632 		/* change snapused */
1633 		dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
1634 		    -used, -compressed, -uncompressed, tx);
1635 
1636 		/* free next's deadlist */
1637 		bplist_close(&ds_next->ds_deadlist);
1638 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
1639 
1640 		/* set next's deadlist to our deadlist */
1641 		bplist_close(&ds->ds_deadlist);
1642 		ds_next->ds_phys->ds_deadlist_obj =
1643 		    ds->ds_phys->ds_deadlist_obj;
1644 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
1645 		    ds_next->ds_phys->ds_deadlist_obj));
1646 		ds->ds_phys->ds_deadlist_obj = 0;
1647 
1648 		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
1649 			/*
1650 			 * Update next's unique to include blocks which
1651 			 * were previously shared by only this snapshot
1652 			 * and it.  Those blocks will be born after the
1653 			 * prev snap and before this snap, and will have
1654 			 * died after the next snap and before the one
1655 			 * after that (ie. be on the snap after next's
1656 			 * deadlist).
1657 			 *
1658 			 * XXX we're doing this long task with the
1659 			 * config lock held
1660 			 */
1661 			dsl_dataset_t *ds_after_next;
1662 			uint64_t space;
1663 
1664 			VERIFY(0 == dsl_dataset_hold_obj(dp,
1665 			    ds_next->ds_phys->ds_next_snap_obj,
1666 			    FTAG, &ds_after_next));
1667 
1668 			VERIFY(0 ==
1669 			    bplist_space_birthrange(&ds_after_next->ds_deadlist,
1670 			    ds->ds_phys->ds_prev_snap_txg,
1671 			    ds->ds_phys->ds_creation_txg, &space));
1672 			ds_next->ds_phys->ds_unique_bytes += space;
1673 
1674 			dsl_dataset_rele(ds_after_next, FTAG);
1675 			ASSERT3P(ds_next->ds_prev, ==, NULL);
1676 		} else {
1677 			ASSERT3P(ds_next->ds_prev, ==, ds);
1678 			dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
1679 			ds_next->ds_prev = NULL;
1680 			if (ds_prev) {
1681 				VERIFY(0 == dsl_dataset_get_ref(dp,
1682 				    ds->ds_phys->ds_prev_snap_obj,
1683 				    ds_next, &ds_next->ds_prev));
1684 			}
1685 
1686 			dsl_dataset_recalc_head_uniq(ds_next);
1687 
1688 			/*
1689 			 * Reduce the amount of our unconsmed refreservation
1690 			 * being charged to our parent by the amount of
1691 			 * new unique data we have gained.
1692 			 */
1693 			if (old_unique < ds_next->ds_reserved) {
1694 				int64_t mrsdelta;
1695 				uint64_t new_unique =
1696 				    ds_next->ds_phys->ds_unique_bytes;
1697 
1698 				ASSERT(old_unique <= new_unique);
1699 				mrsdelta = MIN(new_unique - old_unique,
1700 				    ds_next->ds_reserved - old_unique);
1701 				dsl_dir_diduse_space(ds->ds_dir,
1702 				    DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
1703 			}
1704 		}
1705 		dsl_dataset_rele(ds_next, FTAG);
1706 	} else {
1707 		/*
1708 		 * There's no next snapshot, so this is a head dataset.
1709 		 * Destroy the deadlist.  Unless it's a clone, the
1710 		 * deadlist should be empty.  (If it's a clone, it's
1711 		 * safe to ignore the deadlist contents.)
1712 		 */
1713 		struct killarg ka;
1714 
1715 		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
1716 		bplist_close(&ds->ds_deadlist);
1717 		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1718 		ds->ds_phys->ds_deadlist_obj = 0;
1719 
1720 		/*
1721 		 * Free everything that we point to (that's born after
1722 		 * the previous snapshot, if we are a clone)
1723 		 *
1724 		 * NB: this should be very quick, because we already
1725 		 * freed all the objects in open context.
1726 		 */
1727 		ka.ds = ds;
1728 		ka.tx = tx;
1729 		err = traverse_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1730 		    TRAVERSE_POST, kill_blkptr, &ka);
1731 		ASSERT3U(err, ==, 0);
1732 		ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
1733 		    ds->ds_phys->ds_unique_bytes == 0);
1734 
1735 		if (ds->ds_prev != NULL) {
1736 			dsl_dataset_rele(ds->ds_prev, ds);
1737 			ds->ds_prev = ds_prev = NULL;
1738 		}
1739 	}
1740 
1741 	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1742 		/* Erase the link in the dir */
1743 		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1744 		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1745 		ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
1746 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1747 		ASSERT(err == 0);
1748 	} else {
1749 		/* remove from snapshot namespace */
1750 		dsl_dataset_t *ds_head;
1751 		ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
1752 		VERIFY(0 == dsl_dataset_hold_obj(dp,
1753 		    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
1754 		VERIFY(0 == dsl_dataset_get_snapname(ds));
1755 #ifdef ZFS_DEBUG
1756 		{
1757 			uint64_t val;
1758 
1759 			err = dsl_dataset_snap_lookup(ds_head,
1760 			    ds->ds_snapname, &val);
1761 			ASSERT3U(err, ==, 0);
1762 			ASSERT3U(val, ==, obj);
1763 		}
1764 #endif
1765 		err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
1766 		ASSERT(err == 0);
1767 		dsl_dataset_rele(ds_head, FTAG);
1768 	}
1769 
1770 	if (ds_prev && ds->ds_prev != ds_prev)
1771 		dsl_dataset_rele(ds_prev, FTAG);
1772 
1773 	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1774 	spa_history_internal_log(LOG_DS_DESTROY, dp->dp_spa, tx,
1775 	    cr, "dataset = %llu", ds->ds_object);
1776 
1777 	if (ds->ds_phys->ds_next_clones_obj != 0) {
1778 		uint64_t count;
1779 		ASSERT(0 == zap_count(mos,
1780 		    ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
1781 		VERIFY(0 == dmu_object_free(mos,
1782 		    ds->ds_phys->ds_next_clones_obj, tx));
1783 	}
1784 	if (ds->ds_phys->ds_props_obj != 0)
1785 		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
1786 	if (ds->ds_phys->ds_userrefs_obj != 0)
1787 		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
1788 	dsl_dir_close(ds->ds_dir, ds);
1789 	ds->ds_dir = NULL;
1790 	dsl_dataset_drain_refs(ds, tag);
1791 	VERIFY(0 == dmu_object_free(mos, obj, tx));
1792 
1793 	if (dsda->rm_origin) {
1794 		/*
1795 		 * Remove the origin of the clone we just destroyed.
1796 		 */
1797 		struct dsl_ds_destroyarg ndsda = {0};
1798 
1799 		ndsda.ds = dsda->rm_origin;
1800 		dsl_dataset_destroy_sync(&ndsda, tag, cr, tx);
1801 	}
1802 }
1803 
1804 static int
1805 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1806 {
1807 	uint64_t asize;
1808 
1809 	if (!dmu_tx_is_syncing(tx))
1810 		return (0);
1811 
1812 	/*
1813 	 * If there's an fs-only reservation, any blocks that might become
1814 	 * owned by the snapshot dataset must be accommodated by space
1815 	 * outside of the reservation.
1816 	 */
1817 	asize = MIN(dsl_dataset_unique(ds), ds->ds_reserved);
1818 	if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, FALSE))
1819 		return (ENOSPC);
1820 
1821 	/*
1822 	 * Propogate any reserved space for this snapshot to other
1823 	 * snapshot checks in this sync group.
1824 	 */
1825 	if (asize > 0)
1826 		dsl_dir_willuse_space(ds->ds_dir, asize, tx);
1827 
1828 	return (0);
1829 }
1830 
1831 /* ARGSUSED */
1832 int
1833 dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
1834 {
1835 	dsl_dataset_t *ds = arg1;
1836 	const char *snapname = arg2;
1837 	int err;
1838 	uint64_t value;
1839 
1840 	/*
1841 	 * We don't allow multiple snapshots of the same txg.  If there
1842 	 * is already one, try again.
1843 	 */
1844 	if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
1845 		return (EAGAIN);
1846 
1847 	/*
1848 	 * Check for conflicting name snapshot name.
1849 	 */
1850 	err = dsl_dataset_snap_lookup(ds, snapname, &value);
1851 	if (err == 0)
1852 		return (EEXIST);
1853 	if (err != ENOENT)
1854 		return (err);
1855 
1856 	/*
1857 	 * Check that the dataset's name is not too long.  Name consists
1858 	 * of the dataset's length + 1 for the @-sign + snapshot name's length
1859 	 */
1860 	if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
1861 		return (ENAMETOOLONG);
1862 
1863 	err = dsl_dataset_snapshot_reserve_space(ds, tx);
1864 	if (err)
1865 		return (err);
1866 
1867 	ds->ds_trysnap_txg = tx->tx_txg;
1868 	return (0);
1869 }
1870 
1871 void
1872 dsl_dataset_snapshot_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
1873 {
1874 	dsl_dataset_t *ds = arg1;
1875 	const char *snapname = arg2;
1876 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1877 	dmu_buf_t *dbuf;
1878 	dsl_dataset_phys_t *dsphys;
1879 	uint64_t dsobj, crtxg;
1880 	objset_t *mos = dp->dp_meta_objset;
1881 	int err;
1882 
1883 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1884 
1885 	/*
1886 	 * The origin's ds_creation_txg has to be < TXG_INITIAL
1887 	 */
1888 	if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
1889 		crtxg = 1;
1890 	else
1891 		crtxg = tx->tx_txg;
1892 
1893 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
1894 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
1895 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
1896 	dmu_buf_will_dirty(dbuf, tx);
1897 	dsphys = dbuf->db_data;
1898 	bzero(dsphys, sizeof (dsl_dataset_phys_t));
1899 	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
1900 	dsphys->ds_fsid_guid = unique_create();
1901 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
1902 	    sizeof (dsphys->ds_guid));
1903 	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
1904 	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
1905 	dsphys->ds_next_snap_obj = ds->ds_object;
1906 	dsphys->ds_num_children = 1;
1907 	dsphys->ds_creation_time = gethrestime_sec();
1908 	dsphys->ds_creation_txg = crtxg;
1909 	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
1910 	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
1911 	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
1912 	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
1913 	dsphys->ds_flags = ds->ds_phys->ds_flags;
1914 	dsphys->ds_bp = ds->ds_phys->ds_bp;
1915 	dmu_buf_rele(dbuf, FTAG);
1916 
1917 	ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
1918 	if (ds->ds_prev) {
1919 		uint64_t next_clones_obj =
1920 		    ds->ds_prev->ds_phys->ds_next_clones_obj;
1921 		ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
1922 		    ds->ds_object ||
1923 		    ds->ds_prev->ds_phys->ds_num_children > 1);
1924 		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1925 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
1926 			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1927 			    ds->ds_prev->ds_phys->ds_creation_txg);
1928 			ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
1929 		} else if (next_clones_obj != 0) {
1930 			remove_from_next_clones(ds->ds_prev,
1931 			    dsphys->ds_next_snap_obj, tx);
1932 			VERIFY3U(0, ==, zap_add_int(mos,
1933 			    next_clones_obj, dsobj, tx));
1934 		}
1935 	}
1936 
1937 	/*
1938 	 * If we have a reference-reservation on this dataset, we will
1939 	 * need to increase the amount of refreservation being charged
1940 	 * since our unique space is going to zero.
1941 	 */
1942 	if (ds->ds_reserved) {
1943 		int64_t add = MIN(dsl_dataset_unique(ds), ds->ds_reserved);
1944 		dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
1945 		    add, 0, 0, tx);
1946 	}
1947 
1948 	bplist_close(&ds->ds_deadlist);
1949 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1950 	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
1951 	ds->ds_phys->ds_prev_snap_obj = dsobj;
1952 	ds->ds_phys->ds_prev_snap_txg = crtxg;
1953 	ds->ds_phys->ds_unique_bytes = 0;
1954 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
1955 		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1956 	ds->ds_phys->ds_deadlist_obj =
1957 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1958 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
1959 	    ds->ds_phys->ds_deadlist_obj));
1960 
1961 	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
1962 	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
1963 	    snapname, 8, 1, &dsobj, tx);
1964 	ASSERT(err == 0);
1965 
1966 	if (ds->ds_prev)
1967 		dsl_dataset_drop_ref(ds->ds_prev, ds);
1968 	VERIFY(0 == dsl_dataset_get_ref(dp,
1969 	    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
1970 
1971 	dsl_pool_ds_snapshotted(ds, tx);
1972 
1973 	dsl_dir_snap_cmtime_update(ds->ds_dir);
1974 
1975 	spa_history_internal_log(LOG_DS_SNAPSHOT, dp->dp_spa, tx, cr,
1976 	    "dataset = %llu", dsobj);
1977 }
1978 
1979 void
1980 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
1981 {
1982 	ASSERT(dmu_tx_is_syncing(tx));
1983 	ASSERT(ds->ds_objset != NULL);
1984 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
1985 
1986 	/*
1987 	 * in case we had to change ds_fsid_guid when we opened it,
1988 	 * sync it out now.
1989 	 */
1990 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1991 	ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
1992 
1993 	dsl_dir_dirty(ds->ds_dir, tx);
1994 	dmu_objset_sync(ds->ds_objset, zio, tx);
1995 }
1996 
1997 void
1998 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
1999 {
2000 	uint64_t refd, avail, uobjs, aobjs;
2001 
2002 	dsl_dir_stats(ds->ds_dir, nv);
2003 
2004 	dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
2005 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
2006 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
2007 
2008 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
2009 	    ds->ds_phys->ds_creation_time);
2010 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
2011 	    ds->ds_phys->ds_creation_txg);
2012 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
2013 	    ds->ds_quota);
2014 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
2015 	    ds->ds_reserved);
2016 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
2017 	    ds->ds_phys->ds_guid);
2018 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
2019 	    dsl_dataset_unique(ds));
2020 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
2021 	    ds->ds_object);
2022 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS, ds->ds_userrefs);
2023 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
2024 	    DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
2025 
2026 	if (ds->ds_phys->ds_next_snap_obj) {
2027 		/*
2028 		 * This is a snapshot; override the dd's space used with
2029 		 * our unique space and compression ratio.
2030 		 */
2031 		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
2032 		    ds->ds_phys->ds_unique_bytes);
2033 		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO,
2034 		    ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
2035 		    (ds->ds_phys->ds_uncompressed_bytes * 100 /
2036 		    ds->ds_phys->ds_compressed_bytes));
2037 	}
2038 }
2039 
2040 void
2041 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
2042 {
2043 	stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
2044 	stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
2045 	stat->dds_guid = ds->ds_phys->ds_guid;
2046 	if (ds->ds_phys->ds_next_snap_obj) {
2047 		stat->dds_is_snapshot = B_TRUE;
2048 		stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
2049 	} else {
2050 		stat->dds_is_snapshot = B_FALSE;
2051 		stat->dds_num_clones = 0;
2052 	}
2053 
2054 	/* clone origin is really a dsl_dir thing... */
2055 	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
2056 	if (dsl_dir_is_clone(ds->ds_dir)) {
2057 		dsl_dataset_t *ods;
2058 
2059 		VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
2060 		    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
2061 		dsl_dataset_name(ods, stat->dds_origin);
2062 		dsl_dataset_drop_ref(ods, FTAG);
2063 	} else {
2064 		stat->dds_origin[0] = '\0';
2065 	}
2066 	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
2067 }
2068 
2069 uint64_t
2070 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
2071 {
2072 	return (ds->ds_fsid_guid);
2073 }
2074 
2075 void
2076 dsl_dataset_space(dsl_dataset_t *ds,
2077     uint64_t *refdbytesp, uint64_t *availbytesp,
2078     uint64_t *usedobjsp, uint64_t *availobjsp)
2079 {
2080 	*refdbytesp = ds->ds_phys->ds_used_bytes;
2081 	*availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
2082 	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
2083 		*availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
2084 	if (ds->ds_quota != 0) {
2085 		/*
2086 		 * Adjust available bytes according to refquota
2087 		 */
2088 		if (*refdbytesp < ds->ds_quota)
2089 			*availbytesp = MIN(*availbytesp,
2090 			    ds->ds_quota - *refdbytesp);
2091 		else
2092 			*availbytesp = 0;
2093 	}
2094 	*usedobjsp = ds->ds_phys->ds_bp.blk_fill;
2095 	*availobjsp = DN_MAX_OBJECT - *usedobjsp;
2096 }
2097 
2098 boolean_t
2099 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
2100 {
2101 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
2102 
2103 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
2104 	    dsl_pool_sync_context(dp));
2105 	if (ds->ds_prev == NULL)
2106 		return (B_FALSE);
2107 	if (ds->ds_phys->ds_bp.blk_birth >
2108 	    ds->ds_prev->ds_phys->ds_creation_txg)
2109 		return (B_TRUE);
2110 	return (B_FALSE);
2111 }
2112 
2113 /* ARGSUSED */
2114 static int
2115 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
2116 {
2117 	dsl_dataset_t *ds = arg1;
2118 	char *newsnapname = arg2;
2119 	dsl_dir_t *dd = ds->ds_dir;
2120 	dsl_dataset_t *hds;
2121 	uint64_t val;
2122 	int err;
2123 
2124 	err = dsl_dataset_hold_obj(dd->dd_pool,
2125 	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
2126 	if (err)
2127 		return (err);
2128 
2129 	/* new name better not be in use */
2130 	err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
2131 	dsl_dataset_rele(hds, FTAG);
2132 
2133 	if (err == 0)
2134 		err = EEXIST;
2135 	else if (err == ENOENT)
2136 		err = 0;
2137 
2138 	/* dataset name + 1 for the "@" + the new snapshot name must fit */
2139 	if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
2140 		err = ENAMETOOLONG;
2141 
2142 	return (err);
2143 }
2144 
2145 static void
2146 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2,
2147     cred_t *cr, dmu_tx_t *tx)
2148 {
2149 	dsl_dataset_t *ds = arg1;
2150 	const char *newsnapname = arg2;
2151 	dsl_dir_t *dd = ds->ds_dir;
2152 	objset_t *mos = dd->dd_pool->dp_meta_objset;
2153 	dsl_dataset_t *hds;
2154 	int err;
2155 
2156 	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
2157 
2158 	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
2159 	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
2160 
2161 	VERIFY(0 == dsl_dataset_get_snapname(ds));
2162 	err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
2163 	ASSERT3U(err, ==, 0);
2164 	mutex_enter(&ds->ds_lock);
2165 	(void) strcpy(ds->ds_snapname, newsnapname);
2166 	mutex_exit(&ds->ds_lock);
2167 	err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
2168 	    ds->ds_snapname, 8, 1, &ds->ds_object, tx);
2169 	ASSERT3U(err, ==, 0);
2170 
2171 	spa_history_internal_log(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
2172 	    cr, "dataset = %llu", ds->ds_object);
2173 	dsl_dataset_rele(hds, FTAG);
2174 }
2175 
2176 struct renamesnaparg {
2177 	dsl_sync_task_group_t *dstg;
2178 	char failed[MAXPATHLEN];
2179 	char *oldsnap;
2180 	char *newsnap;
2181 };
2182 
2183 static int
2184 dsl_snapshot_rename_one(char *name, void *arg)
2185 {
2186 	struct renamesnaparg *ra = arg;
2187 	dsl_dataset_t *ds = NULL;
2188 	char *cp;
2189 	int err;
2190 
2191 	cp = name + strlen(name);
2192 	*cp = '@';
2193 	(void) strcpy(cp + 1, ra->oldsnap);
2194 
2195 	/*
2196 	 * For recursive snapshot renames the parent won't be changing
2197 	 * so we just pass name for both the to/from argument.
2198 	 */
2199 	err = zfs_secpolicy_rename_perms(name, name, CRED());
2200 	if (err == ENOENT) {
2201 		return (0);
2202 	} else if (err) {
2203 		(void) strcpy(ra->failed, name);
2204 		return (err);
2205 	}
2206 
2207 #ifdef _KERNEL
2208 	/*
2209 	 * For all filesystems undergoing rename, we'll need to unmount it.
2210 	 */
2211 	(void) zfs_unmount_snap(name, NULL);
2212 #endif
2213 	err = dsl_dataset_hold(name, ra->dstg, &ds);
2214 	*cp = '\0';
2215 	if (err == ENOENT) {
2216 		return (0);
2217 	} else if (err) {
2218 		(void) strcpy(ra->failed, name);
2219 		return (err);
2220 	}
2221 
2222 	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
2223 	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
2224 
2225 	return (0);
2226 }
2227 
2228 static int
2229 dsl_recursive_rename(char *oldname, const char *newname)
2230 {
2231 	int err;
2232 	struct renamesnaparg *ra;
2233 	dsl_sync_task_t *dst;
2234 	spa_t *spa;
2235 	char *cp, *fsname = spa_strdup(oldname);
2236 	int len = strlen(oldname);
2237 
2238 	/* truncate the snapshot name to get the fsname */
2239 	cp = strchr(fsname, '@');
2240 	*cp = '\0';
2241 
2242 	err = spa_open(fsname, &spa, FTAG);
2243 	if (err) {
2244 		kmem_free(fsname, len + 1);
2245 		return (err);
2246 	}
2247 	ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
2248 	ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
2249 
2250 	ra->oldsnap = strchr(oldname, '@') + 1;
2251 	ra->newsnap = strchr(newname, '@') + 1;
2252 	*ra->failed = '\0';
2253 
2254 	err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
2255 	    DS_FIND_CHILDREN);
2256 	kmem_free(fsname, len + 1);
2257 
2258 	if (err == 0) {
2259 		err = dsl_sync_task_group_wait(ra->dstg);
2260 	}
2261 
2262 	for (dst = list_head(&ra->dstg->dstg_tasks); dst;
2263 	    dst = list_next(&ra->dstg->dstg_tasks, dst)) {
2264 		dsl_dataset_t *ds = dst->dst_arg1;
2265 		if (dst->dst_err) {
2266 			dsl_dir_name(ds->ds_dir, ra->failed);
2267 			(void) strcat(ra->failed, "@");
2268 			(void) strcat(ra->failed, ra->newsnap);
2269 		}
2270 		dsl_dataset_rele(ds, ra->dstg);
2271 	}
2272 
2273 	if (err)
2274 		(void) strcpy(oldname, ra->failed);
2275 
2276 	dsl_sync_task_group_destroy(ra->dstg);
2277 	kmem_free(ra, sizeof (struct renamesnaparg));
2278 	spa_close(spa, FTAG);
2279 	return (err);
2280 }
2281 
2282 static int
2283 dsl_valid_rename(char *oldname, void *arg)
2284 {
2285 	int delta = *(int *)arg;
2286 
2287 	if (strlen(oldname) + delta >= MAXNAMELEN)
2288 		return (ENAMETOOLONG);
2289 
2290 	return (0);
2291 }
2292 
2293 #pragma weak dmu_objset_rename = dsl_dataset_rename
2294 int
2295 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
2296 {
2297 	dsl_dir_t *dd;
2298 	dsl_dataset_t *ds;
2299 	const char *tail;
2300 	int err;
2301 
2302 	err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2303 	if (err)
2304 		return (err);
2305 	/*
2306 	 * If there are more than 2 references there may be holds
2307 	 * hanging around that haven't been cleared out yet.
2308 	 */
2309 	if (dmu_buf_refcount(dd->dd_dbuf) > 2)
2310 		txg_wait_synced(dd->dd_pool, 0);
2311 	if (tail == NULL) {
2312 		int delta = strlen(newname) - strlen(oldname);
2313 
2314 		/* if we're growing, validate child name lengths */
2315 		if (delta > 0)
2316 			err = dmu_objset_find(oldname, dsl_valid_rename,
2317 			    &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2318 
2319 		if (!err)
2320 			err = dsl_dir_rename(dd, newname);
2321 		dsl_dir_close(dd, FTAG);
2322 		return (err);
2323 	}
2324 	if (tail[0] != '@') {
2325 		/* the name ended in a nonexistent component */
2326 		dsl_dir_close(dd, FTAG);
2327 		return (ENOENT);
2328 	}
2329 
2330 	dsl_dir_close(dd, FTAG);
2331 
2332 	/* new name must be snapshot in same filesystem */
2333 	tail = strchr(newname, '@');
2334 	if (tail == NULL)
2335 		return (EINVAL);
2336 	tail++;
2337 	if (strncmp(oldname, newname, tail - newname) != 0)
2338 		return (EXDEV);
2339 
2340 	if (recursive) {
2341 		err = dsl_recursive_rename(oldname, newname);
2342 	} else {
2343 		err = dsl_dataset_hold(oldname, FTAG, &ds);
2344 		if (err)
2345 			return (err);
2346 
2347 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2348 		    dsl_dataset_snapshot_rename_check,
2349 		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2350 
2351 		dsl_dataset_rele(ds, FTAG);
2352 	}
2353 
2354 	return (err);
2355 }
2356 
2357 struct promotenode {
2358 	list_node_t link;
2359 	dsl_dataset_t *ds;
2360 };
2361 
2362 struct promotearg {
2363 	list_t shared_snaps, origin_snaps, clone_snaps;
2364 	dsl_dataset_t *origin_origin, *origin_head;
2365 	uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
2366 	char *err_ds;
2367 };
2368 
2369 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
2370 
2371 /* ARGSUSED */
2372 static int
2373 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2374 {
2375 	dsl_dataset_t *hds = arg1;
2376 	struct promotearg *pa = arg2;
2377 	struct promotenode *snap = list_head(&pa->shared_snaps);
2378 	dsl_dataset_t *origin_ds = snap->ds;
2379 	int err;
2380 
2381 	/* Check that it is a real clone */
2382 	if (!dsl_dir_is_clone(hds->ds_dir))
2383 		return (EINVAL);
2384 
2385 	/* Since this is so expensive, don't do the preliminary check */
2386 	if (!dmu_tx_is_syncing(tx))
2387 		return (0);
2388 
2389 	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
2390 		return (EXDEV);
2391 
2392 	/* compute origin's new unique space */
2393 	snap = list_tail(&pa->clone_snaps);
2394 	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2395 	err = bplist_space_birthrange(&snap->ds->ds_deadlist,
2396 	    origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX, &pa->unique);
2397 	if (err)
2398 		return (err);
2399 
2400 	/*
2401 	 * Walk the snapshots that we are moving
2402 	 *
2403 	 * Compute space to transfer.  Consider the incremental changes
2404 	 * to used for each snapshot:
2405 	 * (my used) = (prev's used) + (blocks born) - (blocks killed)
2406 	 * So each snapshot gave birth to:
2407 	 * (blocks born) = (my used) - (prev's used) + (blocks killed)
2408 	 * So a sequence would look like:
2409 	 * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
2410 	 * Which simplifies to:
2411 	 * uN + kN + kN-1 + ... + k1 + k0
2412 	 * Note however, if we stop before we reach the ORIGIN we get:
2413 	 * uN + kN + kN-1 + ... + kM - uM-1
2414 	 */
2415 	pa->used = origin_ds->ds_phys->ds_used_bytes;
2416 	pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
2417 	pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
2418 	for (snap = list_head(&pa->shared_snaps); snap;
2419 	    snap = list_next(&pa->shared_snaps, snap)) {
2420 		uint64_t val, dlused, dlcomp, dluncomp;
2421 		dsl_dataset_t *ds = snap->ds;
2422 
2423 		/* Check that the snapshot name does not conflict */
2424 		VERIFY(0 == dsl_dataset_get_snapname(ds));
2425 		err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
2426 		if (err == 0) {
2427 			err = EEXIST;
2428 			goto out;
2429 		}
2430 		if (err != ENOENT)
2431 			goto out;
2432 
2433 		/* The very first snapshot does not have a deadlist */
2434 		if (ds->ds_phys->ds_prev_snap_obj == 0)
2435 			continue;
2436 
2437 		if (err = bplist_space(&ds->ds_deadlist,
2438 		    &dlused, &dlcomp, &dluncomp))
2439 			goto out;
2440 		pa->used += dlused;
2441 		pa->comp += dlcomp;
2442 		pa->uncomp += dluncomp;
2443 	}
2444 
2445 	/*
2446 	 * If we are a clone of a clone then we never reached ORIGIN,
2447 	 * so we need to subtract out the clone origin's used space.
2448 	 */
2449 	if (pa->origin_origin) {
2450 		pa->used -= pa->origin_origin->ds_phys->ds_used_bytes;
2451 		pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
2452 		pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
2453 	}
2454 
2455 	/* Check that there is enough space here */
2456 	err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
2457 	    pa->used);
2458 	if (err)
2459 		return (err);
2460 
2461 	/*
2462 	 * Compute the amounts of space that will be used by snapshots
2463 	 * after the promotion (for both origin and clone).  For each,
2464 	 * it is the amount of space that will be on all of their
2465 	 * deadlists (that was not born before their new origin).
2466 	 */
2467 	if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2468 		uint64_t space;
2469 
2470 		/*
2471 		 * Note, typically this will not be a clone of a clone,
2472 		 * so snap->ds->ds_origin_txg will be < TXG_INITIAL, so
2473 		 * these snaplist_space() -> bplist_space_birthrange()
2474 		 * calls will be fast because they do not have to
2475 		 * iterate over all bps.
2476 		 */
2477 		snap = list_head(&pa->origin_snaps);
2478 		err = snaplist_space(&pa->shared_snaps,
2479 		    snap->ds->ds_origin_txg, &pa->cloneusedsnap);
2480 		if (err)
2481 			return (err);
2482 
2483 		err = snaplist_space(&pa->clone_snaps,
2484 		    snap->ds->ds_origin_txg, &space);
2485 		if (err)
2486 			return (err);
2487 		pa->cloneusedsnap += space;
2488 	}
2489 	if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
2490 		err = snaplist_space(&pa->origin_snaps,
2491 		    origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
2492 		if (err)
2493 			return (err);
2494 	}
2495 
2496 	return (0);
2497 out:
2498 	pa->err_ds =  snap->ds->ds_snapname;
2499 	return (err);
2500 }
2501 
2502 static void
2503 dsl_dataset_promote_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
2504 {
2505 	dsl_dataset_t *hds = arg1;
2506 	struct promotearg *pa = arg2;
2507 	struct promotenode *snap = list_head(&pa->shared_snaps);
2508 	dsl_dataset_t *origin_ds = snap->ds;
2509 	dsl_dataset_t *origin_head;
2510 	dsl_dir_t *dd = hds->ds_dir;
2511 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
2512 	dsl_dir_t *odd = NULL;
2513 	uint64_t oldnext_obj;
2514 	int64_t delta;
2515 
2516 	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2517 
2518 	snap = list_head(&pa->origin_snaps);
2519 	origin_head = snap->ds;
2520 
2521 	/*
2522 	 * We need to explicitly open odd, since origin_ds's dd will be
2523 	 * changing.
2524 	 */
2525 	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2526 	    NULL, FTAG, &odd));
2527 
2528 	/* change origin's next snap */
2529 	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2530 	oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
2531 	snap = list_tail(&pa->clone_snaps);
2532 	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
2533 	origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
2534 
2535 	/* change the origin's next clone */
2536 	if (origin_ds->ds_phys->ds_next_clones_obj) {
2537 		remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
2538 		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
2539 		    origin_ds->ds_phys->ds_next_clones_obj,
2540 		    oldnext_obj, tx));
2541 	}
2542 
2543 	/* change origin */
2544 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
2545 	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2546 	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2547 	hds->ds_origin_txg = origin_head->ds_origin_txg;
2548 	dmu_buf_will_dirty(odd->dd_dbuf, tx);
2549 	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2550 	origin_head->ds_origin_txg = origin_ds->ds_phys->ds_creation_txg;
2551 
2552 	/* move snapshots to this dir */
2553 	for (snap = list_head(&pa->shared_snaps); snap;
2554 	    snap = list_next(&pa->shared_snaps, snap)) {
2555 		dsl_dataset_t *ds = snap->ds;
2556 
2557 		/* unregister props as dsl_dir is changing */
2558 		if (ds->ds_objset) {
2559 			dmu_objset_evict(ds->ds_objset);
2560 			ds->ds_objset = NULL;
2561 		}
2562 		/* move snap name entry */
2563 		VERIFY(0 == dsl_dataset_get_snapname(ds));
2564 		VERIFY(0 == dsl_dataset_snap_remove(origin_head,
2565 		    ds->ds_snapname, tx));
2566 		VERIFY(0 == zap_add(dp->dp_meta_objset,
2567 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2568 		    8, 1, &ds->ds_object, tx));
2569 		/* change containing dsl_dir */
2570 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
2571 		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2572 		ds->ds_phys->ds_dir_obj = dd->dd_object;
2573 		ASSERT3P(ds->ds_dir, ==, odd);
2574 		dsl_dir_close(ds->ds_dir, ds);
2575 		VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2576 		    NULL, ds, &ds->ds_dir));
2577 
2578 		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2579 	}
2580 
2581 	/*
2582 	 * Change space accounting.
2583 	 * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
2584 	 * both be valid, or both be 0 (resulting in delta == 0).  This
2585 	 * is true for each of {clone,origin} independently.
2586 	 */
2587 
2588 	delta = pa->cloneusedsnap -
2589 	    dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2590 	ASSERT3S(delta, >=, 0);
2591 	ASSERT3U(pa->used, >=, delta);
2592 	dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
2593 	dsl_dir_diduse_space(dd, DD_USED_HEAD,
2594 	    pa->used - delta, pa->comp, pa->uncomp, tx);
2595 
2596 	delta = pa->originusedsnap -
2597 	    odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
2598 	ASSERT3S(delta, <=, 0);
2599 	ASSERT3U(pa->used, >=, -delta);
2600 	dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
2601 	dsl_dir_diduse_space(odd, DD_USED_HEAD,
2602 	    -pa->used - delta, -pa->comp, -pa->uncomp, tx);
2603 
2604 	origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2605 
2606 	/* log history record */
2607 	spa_history_internal_log(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2608 	    cr, "dataset = %llu", hds->ds_object);
2609 
2610 	dsl_dir_close(odd, FTAG);
2611 }
2612 
2613 static char *snaplist_tag = "snaplist";
2614 /*
2615  * Make a list of dsl_dataset_t's for the snapshots between first_obj
2616  * (exclusive) and last_obj (inclusive).  The list will be in reverse
2617  * order (last_obj will be the list_head()).  If first_obj == 0, do all
2618  * snapshots back to this dataset's origin.
2619  */
2620 static int
2621 snaplist_make(dsl_pool_t *dp, boolean_t own,
2622     uint64_t first_obj, uint64_t last_obj, list_t *l)
2623 {
2624 	uint64_t obj = last_obj;
2625 
2626 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
2627 
2628 	list_create(l, sizeof (struct promotenode),
2629 	    offsetof(struct promotenode, link));
2630 
2631 	while (obj != first_obj) {
2632 		dsl_dataset_t *ds;
2633 		struct promotenode *snap;
2634 		int err;
2635 
2636 		if (own) {
2637 			err = dsl_dataset_own_obj(dp, obj,
2638 			    0, snaplist_tag, &ds);
2639 			if (err == 0)
2640 				dsl_dataset_make_exclusive(ds, snaplist_tag);
2641 		} else {
2642 			err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
2643 		}
2644 		if (err == ENOENT) {
2645 			/* lost race with snapshot destroy */
2646 			struct promotenode *last = list_tail(l);
2647 			ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
2648 			obj = last->ds->ds_phys->ds_prev_snap_obj;
2649 			continue;
2650 		} else if (err) {
2651 			return (err);
2652 		}
2653 
2654 		if (first_obj == 0)
2655 			first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
2656 
2657 		snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
2658 		snap->ds = ds;
2659 		list_insert_tail(l, snap);
2660 		obj = ds->ds_phys->ds_prev_snap_obj;
2661 	}
2662 
2663 	return (0);
2664 }
2665 
2666 static int
2667 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
2668 {
2669 	struct promotenode *snap;
2670 
2671 	*spacep = 0;
2672 	for (snap = list_head(l); snap; snap = list_next(l, snap)) {
2673 		uint64_t used;
2674 		int err = bplist_space_birthrange(&snap->ds->ds_deadlist,
2675 		    mintxg, UINT64_MAX, &used);
2676 		if (err)
2677 			return (err);
2678 		*spacep += used;
2679 	}
2680 	return (0);
2681 }
2682 
2683 static void
2684 snaplist_destroy(list_t *l, boolean_t own)
2685 {
2686 	struct promotenode *snap;
2687 
2688 	if (!l || !list_link_active(&l->list_head))
2689 		return;
2690 
2691 	while ((snap = list_tail(l)) != NULL) {
2692 		list_remove(l, snap);
2693 		if (own)
2694 			dsl_dataset_disown(snap->ds, snaplist_tag);
2695 		else
2696 			dsl_dataset_rele(snap->ds, snaplist_tag);
2697 		kmem_free(snap, sizeof (struct promotenode));
2698 	}
2699 	list_destroy(l);
2700 }
2701 
2702 /*
2703  * Promote a clone.  Nomenclature note:
2704  * "clone" or "cds": the original clone which is being promoted
2705  * "origin" or "ods": the snapshot which is originally clone's origin
2706  * "origin head" or "ohds": the dataset which is the head
2707  * (filesystem/volume) for the origin
2708  * "origin origin": the origin of the origin's filesystem (typically
2709  * NULL, indicating that the clone is not a clone of a clone).
2710  */
2711 int
2712 dsl_dataset_promote(const char *name, char *conflsnap)
2713 {
2714 	dsl_dataset_t *ds;
2715 	dsl_dir_t *dd;
2716 	dsl_pool_t *dp;
2717 	dmu_object_info_t doi;
2718 	struct promotearg pa = { 0 };
2719 	struct promotenode *snap;
2720 	int err;
2721 
2722 	err = dsl_dataset_hold(name, FTAG, &ds);
2723 	if (err)
2724 		return (err);
2725 	dd = ds->ds_dir;
2726 	dp = dd->dd_pool;
2727 
2728 	err = dmu_object_info(dp->dp_meta_objset,
2729 	    ds->ds_phys->ds_snapnames_zapobj, &doi);
2730 	if (err) {
2731 		dsl_dataset_rele(ds, FTAG);
2732 		return (err);
2733 	}
2734 
2735 	if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
2736 		dsl_dataset_rele(ds, FTAG);
2737 		return (EINVAL);
2738 	}
2739 
2740 	/*
2741 	 * We are going to inherit all the snapshots taken before our
2742 	 * origin (i.e., our new origin will be our parent's origin).
2743 	 * Take ownership of them so that we can rename them into our
2744 	 * namespace.
2745 	 */
2746 	rw_enter(&dp->dp_config_rwlock, RW_READER);
2747 
2748 	err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
2749 	    &pa.shared_snaps);
2750 	if (err != 0)
2751 		goto out;
2752 
2753 	err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
2754 	if (err != 0)
2755 		goto out;
2756 
2757 	snap = list_head(&pa.shared_snaps);
2758 	ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
2759 	err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
2760 	    snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
2761 	if (err != 0)
2762 		goto out;
2763 
2764 	if (dsl_dir_is_clone(snap->ds->ds_dir)) {
2765 		err = dsl_dataset_own_obj(dp,
2766 		    snap->ds->ds_dir->dd_phys->dd_origin_obj,
2767 		    0, FTAG, &pa.origin_origin);
2768 		if (err != 0)
2769 			goto out;
2770 	}
2771 
2772 out:
2773 	rw_exit(&dp->dp_config_rwlock);
2774 
2775 	/*
2776 	 * Add in 128x the snapnames zapobj size, since we will be moving
2777 	 * a bunch of snapnames to the promoted ds, and dirtying their
2778 	 * bonus buffers.
2779 	 */
2780 	if (err == 0) {
2781 		err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
2782 		    dsl_dataset_promote_sync, ds, &pa,
2783 		    2 + 2 * doi.doi_physical_blocks_512);
2784 		if (err && pa.err_ds && conflsnap)
2785 			(void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
2786 	}
2787 
2788 	snaplist_destroy(&pa.shared_snaps, B_TRUE);
2789 	snaplist_destroy(&pa.clone_snaps, B_FALSE);
2790 	snaplist_destroy(&pa.origin_snaps, B_FALSE);
2791 	if (pa.origin_origin)
2792 		dsl_dataset_disown(pa.origin_origin, FTAG);
2793 	dsl_dataset_rele(ds, FTAG);
2794 	return (err);
2795 }
2796 
2797 struct cloneswaparg {
2798 	dsl_dataset_t *cds; /* clone dataset */
2799 	dsl_dataset_t *ohds; /* origin's head dataset */
2800 	boolean_t force;
2801 	int64_t unused_refres_delta; /* change in unconsumed refreservation */
2802 };
2803 
2804 /* ARGSUSED */
2805 static int
2806 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
2807 {
2808 	struct cloneswaparg *csa = arg1;
2809 
2810 	/* they should both be heads */
2811 	if (dsl_dataset_is_snapshot(csa->cds) ||
2812 	    dsl_dataset_is_snapshot(csa->ohds))
2813 		return (EINVAL);
2814 
2815 	/* the branch point should be just before them */
2816 	if (csa->cds->ds_prev != csa->ohds->ds_prev)
2817 		return (EINVAL);
2818 
2819 	/* cds should be the clone (unless they are unrelated) */
2820 	if (csa->cds->ds_prev != NULL &&
2821 	    csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
2822 	    csa->ohds->ds_object !=
2823 	    csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
2824 		return (EINVAL);
2825 
2826 	/* the clone should be a child of the origin */
2827 	if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
2828 		return (EINVAL);
2829 
2830 	/* ohds shouldn't be modified unless 'force' */
2831 	if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
2832 		return (ETXTBSY);
2833 
2834 	/* adjust amount of any unconsumed refreservation */
2835 	csa->unused_refres_delta =
2836 	    (int64_t)MIN(csa->ohds->ds_reserved,
2837 	    csa->ohds->ds_phys->ds_unique_bytes) -
2838 	    (int64_t)MIN(csa->ohds->ds_reserved,
2839 	    csa->cds->ds_phys->ds_unique_bytes);
2840 
2841 	if (csa->unused_refres_delta > 0 &&
2842 	    csa->unused_refres_delta >
2843 	    dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
2844 		return (ENOSPC);
2845 
2846 	if (csa->ohds->ds_quota != 0 &&
2847 	    csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
2848 		return (EDQUOT);
2849 
2850 	return (0);
2851 }
2852 
2853 /* ARGSUSED */
2854 static void
2855 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
2856 {
2857 	struct cloneswaparg *csa = arg1;
2858 	dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
2859 
2860 	ASSERT(csa->cds->ds_reserved == 0);
2861 	ASSERT(csa->ohds->ds_quota == 0 ||
2862 	    csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
2863 
2864 	dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
2865 	dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
2866 
2867 	if (csa->cds->ds_objset != NULL) {
2868 		dmu_objset_evict(csa->cds->ds_objset);
2869 		csa->cds->ds_objset = NULL;
2870 	}
2871 
2872 	if (csa->ohds->ds_objset != NULL) {
2873 		dmu_objset_evict(csa->ohds->ds_objset);
2874 		csa->ohds->ds_objset = NULL;
2875 	}
2876 
2877 	/*
2878 	 * Reset origin's unique bytes, if it exists.
2879 	 */
2880 	if (csa->cds->ds_prev) {
2881 		dsl_dataset_t *origin = csa->cds->ds_prev;
2882 		dmu_buf_will_dirty(origin->ds_dbuf, tx);
2883 		VERIFY(0 == bplist_space_birthrange(&csa->cds->ds_deadlist,
2884 		    origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
2885 		    &origin->ds_phys->ds_unique_bytes));
2886 	}
2887 
2888 	/* swap blkptrs */
2889 	{
2890 		blkptr_t tmp;
2891 		tmp = csa->ohds->ds_phys->ds_bp;
2892 		csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
2893 		csa->cds->ds_phys->ds_bp = tmp;
2894 	}
2895 
2896 	/* set dd_*_bytes */
2897 	{
2898 		int64_t dused, dcomp, duncomp;
2899 		uint64_t cdl_used, cdl_comp, cdl_uncomp;
2900 		uint64_t odl_used, odl_comp, odl_uncomp;
2901 
2902 		ASSERT3U(csa->cds->ds_dir->dd_phys->
2903 		    dd_used_breakdown[DD_USED_SNAP], ==, 0);
2904 
2905 		VERIFY(0 == bplist_space(&csa->cds->ds_deadlist, &cdl_used,
2906 		    &cdl_comp, &cdl_uncomp));
2907 		VERIFY(0 == bplist_space(&csa->ohds->ds_deadlist, &odl_used,
2908 		    &odl_comp, &odl_uncomp));
2909 
2910 		dused = csa->cds->ds_phys->ds_used_bytes + cdl_used -
2911 		    (csa->ohds->ds_phys->ds_used_bytes + odl_used);
2912 		dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
2913 		    (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
2914 		duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
2915 		    cdl_uncomp -
2916 		    (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
2917 
2918 		dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
2919 		    dused, dcomp, duncomp, tx);
2920 		dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
2921 		    -dused, -dcomp, -duncomp, tx);
2922 
2923 		/*
2924 		 * The difference in the space used by snapshots is the
2925 		 * difference in snapshot space due to the head's
2926 		 * deadlist (since that's the only thing that's
2927 		 * changing that affects the snapused).
2928 		 */
2929 		VERIFY(0 == bplist_space_birthrange(&csa->cds->ds_deadlist,
2930 		    csa->ohds->ds_origin_txg, UINT64_MAX, &cdl_used));
2931 		VERIFY(0 == bplist_space_birthrange(&csa->ohds->ds_deadlist,
2932 		    csa->ohds->ds_origin_txg, UINT64_MAX, &odl_used));
2933 		dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
2934 		    DD_USED_HEAD, DD_USED_SNAP, tx);
2935 	}
2936 
2937 #define	SWITCH64(x, y) \
2938 	{ \
2939 		uint64_t __tmp = (x); \
2940 		(x) = (y); \
2941 		(y) = __tmp; \
2942 	}
2943 
2944 	/* swap ds_*_bytes */
2945 	SWITCH64(csa->ohds->ds_phys->ds_used_bytes,
2946 	    csa->cds->ds_phys->ds_used_bytes);
2947 	SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
2948 	    csa->cds->ds_phys->ds_compressed_bytes);
2949 	SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
2950 	    csa->cds->ds_phys->ds_uncompressed_bytes);
2951 	SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
2952 	    csa->cds->ds_phys->ds_unique_bytes);
2953 
2954 	/* apply any parent delta for change in unconsumed refreservation */
2955 	dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
2956 	    csa->unused_refres_delta, 0, 0, tx);
2957 
2958 	/* swap deadlists */
2959 	bplist_close(&csa->cds->ds_deadlist);
2960 	bplist_close(&csa->ohds->ds_deadlist);
2961 	SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
2962 	    csa->cds->ds_phys->ds_deadlist_obj);
2963 	VERIFY(0 == bplist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
2964 	    csa->cds->ds_phys->ds_deadlist_obj));
2965 	VERIFY(0 == bplist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
2966 	    csa->ohds->ds_phys->ds_deadlist_obj));
2967 
2968 	dsl_pool_ds_clone_swapped(csa->ohds, csa->cds, tx);
2969 }
2970 
2971 /*
2972  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
2973  * recv" into an existing fs to swizzle the file system to the new
2974  * version, and by "zfs rollback".  Can also be used to swap two
2975  * independent head datasets if neither has any snapshots.
2976  */
2977 int
2978 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
2979     boolean_t force)
2980 {
2981 	struct cloneswaparg csa;
2982 	int error;
2983 
2984 	ASSERT(clone->ds_owner);
2985 	ASSERT(origin_head->ds_owner);
2986 retry:
2987 	/* Need exclusive access for the swap */
2988 	rw_enter(&clone->ds_rwlock, RW_WRITER);
2989 	if (!rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
2990 		rw_exit(&clone->ds_rwlock);
2991 		rw_enter(&origin_head->ds_rwlock, RW_WRITER);
2992 		if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
2993 			rw_exit(&origin_head->ds_rwlock);
2994 			goto retry;
2995 		}
2996 	}
2997 	csa.cds = clone;
2998 	csa.ohds = origin_head;
2999 	csa.force = force;
3000 	error = dsl_sync_task_do(clone->ds_dir->dd_pool,
3001 	    dsl_dataset_clone_swap_check,
3002 	    dsl_dataset_clone_swap_sync, &csa, NULL, 9);
3003 	return (error);
3004 }
3005 
3006 /*
3007  * Given a pool name and a dataset object number in that pool,
3008  * return the name of that dataset.
3009  */
3010 int
3011 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
3012 {
3013 	spa_t *spa;
3014 	dsl_pool_t *dp;
3015 	dsl_dataset_t *ds;
3016 	int error;
3017 
3018 	if ((error = spa_open(pname, &spa, FTAG)) != 0)
3019 		return (error);
3020 	dp = spa_get_dsl(spa);
3021 	rw_enter(&dp->dp_config_rwlock, RW_READER);
3022 	if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
3023 		dsl_dataset_name(ds, buf);
3024 		dsl_dataset_rele(ds, FTAG);
3025 	}
3026 	rw_exit(&dp->dp_config_rwlock);
3027 	spa_close(spa, FTAG);
3028 
3029 	return (error);
3030 }
3031 
3032 int
3033 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
3034     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
3035 {
3036 	int error = 0;
3037 
3038 	ASSERT3S(asize, >, 0);
3039 
3040 	/*
3041 	 * *ref_rsrv is the portion of asize that will come from any
3042 	 * unconsumed refreservation space.
3043 	 */
3044 	*ref_rsrv = 0;
3045 
3046 	mutex_enter(&ds->ds_lock);
3047 	/*
3048 	 * Make a space adjustment for reserved bytes.
3049 	 */
3050 	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
3051 		ASSERT3U(*used, >=,
3052 		    ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3053 		*used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
3054 		*ref_rsrv =
3055 		    asize - MIN(asize, parent_delta(ds, asize + inflight));
3056 	}
3057 
3058 	if (!check_quota || ds->ds_quota == 0) {
3059 		mutex_exit(&ds->ds_lock);
3060 		return (0);
3061 	}
3062 	/*
3063 	 * If they are requesting more space, and our current estimate
3064 	 * is over quota, they get to try again unless the actual
3065 	 * on-disk is over quota and there are no pending changes (which
3066 	 * may free up space for us).
3067 	 */
3068 	if (ds->ds_phys->ds_used_bytes + inflight >= ds->ds_quota) {
3069 		if (inflight > 0 || ds->ds_phys->ds_used_bytes < ds->ds_quota)
3070 			error = ERESTART;
3071 		else
3072 			error = EDQUOT;
3073 	}
3074 	mutex_exit(&ds->ds_lock);
3075 
3076 	return (error);
3077 }
3078 
3079 /* ARGSUSED */
3080 static int
3081 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
3082 {
3083 	dsl_dataset_t *ds = arg1;
3084 	uint64_t *quotap = arg2;
3085 	uint64_t new_quota = *quotap;
3086 
3087 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
3088 		return (ENOTSUP);
3089 
3090 	if (new_quota == 0)
3091 		return (0);
3092 
3093 	if (new_quota < ds->ds_phys->ds_used_bytes ||
3094 	    new_quota < ds->ds_reserved)
3095 		return (ENOSPC);
3096 
3097 	return (0);
3098 }
3099 
3100 /* ARGSUSED */
3101 void
3102 dsl_dataset_set_quota_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
3103 {
3104 	dsl_dataset_t *ds = arg1;
3105 	uint64_t *quotap = arg2;
3106 	uint64_t new_quota = *quotap;
3107 
3108 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
3109 
3110 	ds->ds_quota = new_quota;
3111 
3112 	dsl_dir_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
3113 
3114 	spa_history_internal_log(LOG_DS_REFQUOTA, ds->ds_dir->dd_pool->dp_spa,
3115 	    tx, cr, "%lld dataset = %llu ",
3116 	    (longlong_t)new_quota, ds->ds_object);
3117 }
3118 
3119 int
3120 dsl_dataset_set_quota(const char *dsname, uint64_t quota)
3121 {
3122 	dsl_dataset_t *ds;
3123 	int err;
3124 
3125 	err = dsl_dataset_hold(dsname, FTAG, &ds);
3126 	if (err)
3127 		return (err);
3128 
3129 	if (quota != ds->ds_quota) {
3130 		/*
3131 		 * If someone removes a file, then tries to set the quota, we
3132 		 * want to make sure the file freeing takes effect.
3133 		 */
3134 		txg_wait_open(ds->ds_dir->dd_pool, 0);
3135 
3136 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3137 		    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
3138 		    ds, &quota, 0);
3139 	}
3140 	dsl_dataset_rele(ds, FTAG);
3141 	return (err);
3142 }
3143 
3144 static int
3145 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
3146 {
3147 	dsl_dataset_t *ds = arg1;
3148 	uint64_t *reservationp = arg2;
3149 	uint64_t new_reservation = *reservationp;
3150 	uint64_t unique;
3151 
3152 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
3153 	    SPA_VERSION_REFRESERVATION)
3154 		return (ENOTSUP);
3155 
3156 	if (dsl_dataset_is_snapshot(ds))
3157 		return (EINVAL);
3158 
3159 	/*
3160 	 * If we are doing the preliminary check in open context, the
3161 	 * space estimates may be inaccurate.
3162 	 */
3163 	if (!dmu_tx_is_syncing(tx))
3164 		return (0);
3165 
3166 	mutex_enter(&ds->ds_lock);
3167 	unique = dsl_dataset_unique(ds);
3168 	mutex_exit(&ds->ds_lock);
3169 
3170 	if (MAX(unique, new_reservation) > MAX(unique, ds->ds_reserved)) {
3171 		uint64_t delta = MAX(unique, new_reservation) -
3172 		    MAX(unique, ds->ds_reserved);
3173 
3174 		if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
3175 			return (ENOSPC);
3176 		if (ds->ds_quota > 0 &&
3177 		    new_reservation > ds->ds_quota)
3178 			return (ENOSPC);
3179 	}
3180 
3181 	return (0);
3182 }
3183 
3184 /* ARGSUSED */
3185 static void
3186 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, cred_t *cr,
3187     dmu_tx_t *tx)
3188 {
3189 	dsl_dataset_t *ds = arg1;
3190 	uint64_t *reservationp = arg2;
3191 	uint64_t new_reservation = *reservationp;
3192 	uint64_t unique;
3193 	int64_t delta;
3194 
3195 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
3196 
3197 	mutex_enter(&ds->ds_dir->dd_lock);
3198 	mutex_enter(&ds->ds_lock);
3199 	unique = dsl_dataset_unique(ds);
3200 	delta = MAX(0, (int64_t)(new_reservation - unique)) -
3201 	    MAX(0, (int64_t)(ds->ds_reserved - unique));
3202 	ds->ds_reserved = new_reservation;
3203 	mutex_exit(&ds->ds_lock);
3204 
3205 	dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
3206 	mutex_exit(&ds->ds_dir->dd_lock);
3207 	dsl_dir_prop_set_uint64_sync(ds->ds_dir, "refreservation",
3208 	    new_reservation, cr, tx);
3209 
3210 	spa_history_internal_log(LOG_DS_REFRESERV,
3211 	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "%lld dataset = %llu",
3212 	    (longlong_t)new_reservation, ds->ds_object);
3213 }
3214 
3215 int
3216 dsl_dataset_set_reservation(const char *dsname, uint64_t reservation)
3217 {
3218 	dsl_dataset_t *ds;
3219 	int err;
3220 
3221 	err = dsl_dataset_hold(dsname, FTAG, &ds);
3222 	if (err)
3223 		return (err);
3224 
3225 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
3226 	    dsl_dataset_set_reservation_check,
3227 	    dsl_dataset_set_reservation_sync, ds, &reservation, 0);
3228 	dsl_dataset_rele(ds, FTAG);
3229 	return (err);
3230 }
3231 
3232 struct dsl_ds_holdarg {
3233 	dsl_sync_task_group_t *dstg;
3234 	char *htag;
3235 	char *snapname;
3236 	boolean_t recursive;
3237 	boolean_t gotone;
3238 	boolean_t temphold;
3239 	char failed[MAXPATHLEN];
3240 };
3241 
3242 static int
3243 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
3244 {
3245 	dsl_dataset_t *ds = arg1;
3246 	struct dsl_ds_holdarg *ha = arg2;
3247 	char *htag = ha->htag;
3248 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3249 	int error = 0;
3250 
3251 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3252 		return (ENOTSUP);
3253 
3254 	if (!dsl_dataset_is_snapshot(ds))
3255 		return (EINVAL);
3256 
3257 	/* tags must be unique */
3258 	mutex_enter(&ds->ds_lock);
3259 	if (ds->ds_phys->ds_userrefs_obj) {
3260 		error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
3261 		    8, 1, tx);
3262 		if (error == 0)
3263 			error = EEXIST;
3264 		else if (error == ENOENT)
3265 			error = 0;
3266 	}
3267 	mutex_exit(&ds->ds_lock);
3268 
3269 	return (error);
3270 }
3271 
3272 static void
3273 dsl_dataset_user_hold_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
3274 {
3275 	dsl_dataset_t *ds = arg1;
3276 	struct dsl_ds_holdarg *ha = arg2;
3277 	char *htag = ha->htag;
3278 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3279 	objset_t *mos = dp->dp_meta_objset;
3280 	time_t now = gethrestime_sec();
3281 	uint64_t zapobj;
3282 
3283 	mutex_enter(&ds->ds_lock);
3284 	if (ds->ds_phys->ds_userrefs_obj == 0) {
3285 		/*
3286 		 * This is the first user hold for this dataset.  Create
3287 		 * the userrefs zap object.
3288 		 */
3289 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3290 		zapobj = ds->ds_phys->ds_userrefs_obj =
3291 		    zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
3292 	} else {
3293 		zapobj = ds->ds_phys->ds_userrefs_obj;
3294 	}
3295 	ds->ds_userrefs++;
3296 	mutex_exit(&ds->ds_lock);
3297 
3298 	VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
3299 
3300 	if (ha->temphold) {
3301 		VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
3302 		    htag, &now, tx));
3303 	}
3304 
3305 	spa_history_internal_log(LOG_DS_USER_HOLD,
3306 	    dp->dp_spa, tx, cr, "<%s> temp = %d dataset = %llu", htag,
3307 	    (int)ha->temphold, ds->ds_object);
3308 }
3309 
3310 static int
3311 dsl_dataset_user_hold_one(char *dsname, void *arg)
3312 {
3313 	struct dsl_ds_holdarg *ha = arg;
3314 	dsl_dataset_t *ds;
3315 	int error;
3316 	char *name;
3317 
3318 	/* alloc a buffer to hold dsname@snapname plus terminating NULL */
3319 	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3320 	error = dsl_dataset_hold(name, ha->dstg, &ds);
3321 	strfree(name);
3322 	if (error == 0) {
3323 		ha->gotone = B_TRUE;
3324 		dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
3325 		    dsl_dataset_user_hold_sync, ds, ha, 0);
3326 	} else if (error == ENOENT && ha->recursive) {
3327 		error = 0;
3328 	} else {
3329 		(void) strcpy(ha->failed, dsname);
3330 	}
3331 	return (error);
3332 }
3333 
3334 int
3335 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
3336     boolean_t recursive, boolean_t temphold)
3337 {
3338 	struct dsl_ds_holdarg *ha;
3339 	dsl_sync_task_t *dst;
3340 	spa_t *spa;
3341 	int error;
3342 
3343 	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3344 
3345 	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3346 
3347 	error = spa_open(dsname, &spa, FTAG);
3348 	if (error) {
3349 		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3350 		return (error);
3351 	}
3352 
3353 	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3354 	ha->htag = htag;
3355 	ha->snapname = snapname;
3356 	ha->recursive = recursive;
3357 	ha->temphold = temphold;
3358 	if (recursive) {
3359 		error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
3360 		    ha, DS_FIND_CHILDREN);
3361 	} else {
3362 		error = dsl_dataset_user_hold_one(dsname, ha);
3363 	}
3364 	if (error == 0)
3365 		error = dsl_sync_task_group_wait(ha->dstg);
3366 
3367 	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3368 	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3369 		dsl_dataset_t *ds = dst->dst_arg1;
3370 
3371 		if (dst->dst_err) {
3372 			dsl_dataset_name(ds, ha->failed);
3373 			*strchr(ha->failed, '@') = '\0';
3374 		}
3375 		dsl_dataset_rele(ds, ha->dstg);
3376 	}
3377 
3378 	if (error == 0 && recursive && !ha->gotone)
3379 		error = ENOENT;
3380 
3381 	if (error)
3382 		(void) strcpy(dsname, ha->failed);
3383 
3384 	dsl_sync_task_group_destroy(ha->dstg);
3385 	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3386 	spa_close(spa, FTAG);
3387 	return (error);
3388 }
3389 
3390 struct dsl_ds_releasearg {
3391 	dsl_dataset_t *ds;
3392 	const char *htag;
3393 	boolean_t own;		/* do we own or just hold ds? */
3394 };
3395 
3396 static int
3397 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
3398     boolean_t *might_destroy)
3399 {
3400 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
3401 	uint64_t zapobj;
3402 	uint64_t tmp;
3403 	int error;
3404 
3405 	*might_destroy = B_FALSE;
3406 
3407 	mutex_enter(&ds->ds_lock);
3408 	zapobj = ds->ds_phys->ds_userrefs_obj;
3409 	if (zapobj == 0) {
3410 		/* The tag can't possibly exist */
3411 		mutex_exit(&ds->ds_lock);
3412 		return (ESRCH);
3413 	}
3414 
3415 	/* Make sure the tag exists */
3416 	error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
3417 	if (error) {
3418 		mutex_exit(&ds->ds_lock);
3419 		if (error == ENOENT)
3420 			error = ESRCH;
3421 		return (error);
3422 	}
3423 
3424 	if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
3425 	    DS_IS_DEFER_DESTROY(ds))
3426 		*might_destroy = B_TRUE;
3427 
3428 	mutex_exit(&ds->ds_lock);
3429 	return (0);
3430 }
3431 
3432 static int
3433 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
3434 {
3435 	struct dsl_ds_releasearg *ra = arg1;
3436 	dsl_dataset_t *ds = ra->ds;
3437 	boolean_t might_destroy;
3438 	int error;
3439 
3440 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
3441 		return (ENOTSUP);
3442 
3443 	error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
3444 	if (error)
3445 		return (error);
3446 
3447 	if (might_destroy) {
3448 		struct dsl_ds_destroyarg dsda = {0};
3449 
3450 		if (dmu_tx_is_syncing(tx)) {
3451 			/*
3452 			 * If we're not prepared to remove the snapshot,
3453 			 * we can't allow the release to happen right now.
3454 			 */
3455 			if (!ra->own)
3456 				return (EBUSY);
3457 			if (ds->ds_objset) {
3458 				dmu_objset_evict(ds->ds_objset);
3459 				ds->ds_objset = NULL;
3460 			}
3461 		}
3462 		dsda.ds = ds;
3463 		dsda.releasing = B_TRUE;
3464 		return (dsl_dataset_destroy_check(&dsda, tag, tx));
3465 	}
3466 
3467 	return (0);
3468 }
3469 
3470 static void
3471 dsl_dataset_user_release_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
3472 {
3473 	struct dsl_ds_releasearg *ra = arg1;
3474 	dsl_dataset_t *ds = ra->ds;
3475 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
3476 	objset_t *mos = dp->dp_meta_objset;
3477 	uint64_t zapobj;
3478 	uint64_t dsobj = ds->ds_object;
3479 	uint64_t refs;
3480 	int error;
3481 
3482 	mutex_enter(&ds->ds_lock);
3483 	ds->ds_userrefs--;
3484 	refs = ds->ds_userrefs;
3485 	mutex_exit(&ds->ds_lock);
3486 	error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
3487 	VERIFY(error == 0 || error == ENOENT);
3488 	zapobj = ds->ds_phys->ds_userrefs_obj;
3489 	VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
3490 	if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
3491 	    DS_IS_DEFER_DESTROY(ds)) {
3492 		struct dsl_ds_destroyarg dsda = {0};
3493 
3494 		ASSERT(ra->own);
3495 		dsda.ds = ds;
3496 		dsda.releasing = B_TRUE;
3497 		/* We already did the destroy_check */
3498 		dsl_dataset_destroy_sync(&dsda, tag, cr, tx);
3499 	}
3500 
3501 	spa_history_internal_log(LOG_DS_USER_RELEASE,
3502 	    dp->dp_spa, tx, cr, "<%s> %lld dataset = %llu",
3503 	    ra->htag, (longlong_t)refs, dsobj);
3504 }
3505 
3506 static int
3507 dsl_dataset_user_release_one(char *dsname, void *arg)
3508 {
3509 	struct dsl_ds_holdarg *ha = arg;
3510 	struct dsl_ds_releasearg *ra;
3511 	dsl_dataset_t *ds;
3512 	int error;
3513 	void *dtag = ha->dstg;
3514 	char *name;
3515 	boolean_t own = B_FALSE;
3516 	boolean_t might_destroy;
3517 
3518 	/* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
3519 	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
3520 	error = dsl_dataset_hold(name, dtag, &ds);
3521 	strfree(name);
3522 	if (error == ENOENT && ha->recursive)
3523 		return (0);
3524 	(void) strcpy(ha->failed, dsname);
3525 	if (error)
3526 		return (error);
3527 
3528 	ha->gotone = B_TRUE;
3529 
3530 	ASSERT(dsl_dataset_is_snapshot(ds));
3531 
3532 	error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
3533 	if (error) {
3534 		dsl_dataset_rele(ds, dtag);
3535 		return (error);
3536 	}
3537 
3538 	if (might_destroy) {
3539 #ifdef _KERNEL
3540 		error = zfs_unmount_snap(name, NULL);
3541 		if (error) {
3542 			dsl_dataset_rele(ds, dtag);
3543 			return (error);
3544 		}
3545 #endif
3546 		if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
3547 			dsl_dataset_rele(ds, dtag);
3548 			return (EBUSY);
3549 		} else {
3550 			own = B_TRUE;
3551 			dsl_dataset_make_exclusive(ds, dtag);
3552 		}
3553 	}
3554 
3555 	ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
3556 	ra->ds = ds;
3557 	ra->htag = ha->htag;
3558 	ra->own = own;
3559 	dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
3560 	    dsl_dataset_user_release_sync, ra, dtag, 0);
3561 
3562 	return (0);
3563 }
3564 
3565 int
3566 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
3567     boolean_t recursive)
3568 {
3569 	struct dsl_ds_holdarg *ha;
3570 	dsl_sync_task_t *dst;
3571 	spa_t *spa;
3572 	int error;
3573 
3574 	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
3575 
3576 	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
3577 
3578 	error = spa_open(dsname, &spa, FTAG);
3579 	if (error) {
3580 		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3581 		return (error);
3582 	}
3583 
3584 	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
3585 	ha->htag = htag;
3586 	ha->snapname = snapname;
3587 	ha->recursive = recursive;
3588 	if (recursive) {
3589 		error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
3590 		    ha, DS_FIND_CHILDREN);
3591 	} else {
3592 		error = dsl_dataset_user_release_one(dsname, ha);
3593 	}
3594 	if (error == 0)
3595 		error = dsl_sync_task_group_wait(ha->dstg);
3596 
3597 	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
3598 	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
3599 		struct dsl_ds_releasearg *ra = dst->dst_arg1;
3600 		dsl_dataset_t *ds = ra->ds;
3601 
3602 		if (dst->dst_err)
3603 			dsl_dataset_name(ds, ha->failed);
3604 
3605 		if (ra->own)
3606 			dsl_dataset_disown(ds, ha->dstg);
3607 		else
3608 			dsl_dataset_rele(ds, ha->dstg);
3609 
3610 		kmem_free(ra, sizeof (struct dsl_ds_releasearg));
3611 	}
3612 
3613 	if (error == 0 && recursive && !ha->gotone)
3614 		error = ENOENT;
3615 
3616 	if (error)
3617 		(void) strcpy(dsname, ha->failed);
3618 
3619 	dsl_sync_task_group_destroy(ha->dstg);
3620 	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
3621 	spa_close(spa, FTAG);
3622 	return (error);
3623 }
3624 
3625 /*
3626  * Called at spa_load time to release a stale temporary user hold.
3627  */
3628 int
3629 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag)
3630 {
3631 	dsl_dataset_t *ds;
3632 	char *snap;
3633 	char *name;
3634 	int namelen;
3635 	int error;
3636 
3637 	rw_enter(&dp->dp_config_rwlock, RW_READER);
3638 	error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
3639 	rw_exit(&dp->dp_config_rwlock);
3640 	if (error)
3641 		return (error);
3642 	namelen = dsl_dataset_namelen(ds)+1;
3643 	name = kmem_alloc(namelen, KM_SLEEP);
3644 	dsl_dataset_name(ds, name);
3645 	dsl_dataset_rele(ds, FTAG);
3646 
3647 	snap = strchr(name, '@');
3648 	*snap = '\0';
3649 	++snap;
3650 	return (dsl_dataset_user_release(name, snap, htag, B_FALSE));
3651 }
3652 
3653 int
3654 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
3655 {
3656 	dsl_dataset_t *ds;
3657 	int err;
3658 
3659 	err = dsl_dataset_hold(dsname, FTAG, &ds);
3660 	if (err)
3661 		return (err);
3662 
3663 	VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
3664 	if (ds->ds_phys->ds_userrefs_obj != 0) {
3665 		zap_attribute_t *za;
3666 		zap_cursor_t zc;
3667 
3668 		za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
3669 		for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
3670 		    ds->ds_phys->ds_userrefs_obj);
3671 		    zap_cursor_retrieve(&zc, za) == 0;
3672 		    zap_cursor_advance(&zc)) {
3673 			VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
3674 			    za->za_first_integer));
3675 		}
3676 		zap_cursor_fini(&zc);
3677 		kmem_free(za, sizeof (zap_attribute_t));
3678 	}
3679 	dsl_dataset_rele(ds, FTAG);
3680 	return (0);
3681 }
3682 
3683 /*
3684  * Note, this fuction is used as the callback for dmu_objset_find().  We
3685  * always return 0 so that we will continue to find and process
3686  * inconsistent datasets, even if we encounter an error trying to
3687  * process one of them.
3688  */
3689 /* ARGSUSED */
3690 int
3691 dsl_destroy_inconsistent(char *dsname, void *arg)
3692 {
3693 	dsl_dataset_t *ds;
3694 
3695 	if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
3696 		if (DS_IS_INCONSISTENT(ds))
3697 			(void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
3698 		else
3699 			dsl_dataset_disown(ds, FTAG);
3700 	}
3701 	return (0);
3702 }
3703