xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu_send.c (revision 0fa1b3cc)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26  * Copyright 2014 HybridCluster. All rights reserved.
27  * Copyright 2016 RackTop Systems.
28  * Copyright (c) 2014 Integros [integros.com]
29  */
30 
31 #include <sys/dmu.h>
32 #include <sys/dmu_impl.h>
33 #include <sys/dmu_tx.h>
34 #include <sys/dbuf.h>
35 #include <sys/dnode.h>
36 #include <sys/zfs_context.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/dmu_traverse.h>
39 #include <sys/dsl_dataset.h>
40 #include <sys/dsl_dir.h>
41 #include <sys/dsl_prop.h>
42 #include <sys/dsl_pool.h>
43 #include <sys/dsl_synctask.h>
44 #include <sys/zfs_ioctl.h>
45 #include <sys/zap.h>
46 #include <sys/zio_checksum.h>
47 #include <sys/zfs_znode.h>
48 #include <zfs_fletcher.h>
49 #include <sys/avl.h>
50 #include <sys/ddt.h>
51 #include <sys/zfs_onexit.h>
52 #include <sys/dmu_send.h>
53 #include <sys/dsl_destroy.h>
54 #include <sys/blkptr.h>
55 #include <sys/dsl_bookmark.h>
56 #include <sys/zfeature.h>
57 #include <sys/bqueue.h>
58 
59 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
60 int zfs_send_corrupt_data = B_FALSE;
61 int zfs_send_queue_length = 16 * 1024 * 1024;
62 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
63 int zfs_send_set_freerecords_bit = B_TRUE;
64 
65 /*
66  * Use this to override the recordsize calculation for fast zfs send estimates.
67  */
68 uint64_t zfs_override_estimate_recordsize = 0;
69 
70 #define	BP_SPAN(datablkszsec, indblkshift, level) \
71 	(((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
72 	(level) * (indblkshift - SPA_BLKPTRSHIFT)))
73 
74 struct send_thread_arg {
75 	bqueue_t	q;
76 	dsl_dataset_t	*ds;		/* Dataset to traverse */
77 	uint64_t	fromtxg;	/* Traverse from this txg */
78 	int		flags;		/* flags to pass to traverse_dataset */
79 	int		error_code;
80 	boolean_t	cancel;
81 	zbookmark_phys_t resume;
82 };
83 
84 struct send_block_record {
85 	boolean_t		eos_marker; /* Marks the end of the stream */
86 	blkptr_t		bp;
87 	zbookmark_phys_t	zb;
88 	uint8_t			indblkshift;
89 	uint16_t		datablkszsec;
90 	bqueue_node_t		ln;
91 };
92 
93 static int
94 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
95 {
96 	dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
97 	ssize_t resid; /* have to get resid to get detailed errno */
98 
99 	/*
100 	 * The code does not rely on this (len being a multiple of 8).  We keep
101 	 * this assertion because of the corresponding assertion in
102 	 * receive_read().  Keeping this assertion ensures that we do not
103 	 * inadvertently break backwards compatibility (causing the assertion
104 	 * in receive_read() to trigger on old software).
105 	 *
106 	 * Removing the assertions could be rolled into a new feature that uses
107 	 * data that isn't 8-byte aligned; if the assertions were removed, a
108 	 * feature flag would have to be added.
109 	 */
110 
111 	ASSERT0(len % 8);
112 
113 	dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
114 	    (caddr_t)buf, len,
115 	    0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
116 
117 	mutex_enter(&ds->ds_sendstream_lock);
118 	*dsp->dsa_off += len;
119 	mutex_exit(&ds->ds_sendstream_lock);
120 
121 	return (dsp->dsa_err);
122 }
123 
124 /*
125  * For all record types except BEGIN, fill in the checksum (overlaid in
126  * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
127  * up to the start of the checksum itself.
128  */
129 static int
130 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
131 {
132 	ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
133 	    ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
134 	(void) fletcher_4_incremental_native(dsp->dsa_drr,
135 	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
136 	    &dsp->dsa_zc);
137 	if (dsp->dsa_drr->drr_type == DRR_BEGIN) {
138 		dsp->dsa_sent_begin = B_TRUE;
139 	} else {
140 		ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
141 		    drr_checksum.drr_checksum));
142 		dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
143 	}
144 	if (dsp->dsa_drr->drr_type == DRR_END) {
145 		dsp->dsa_sent_end = B_TRUE;
146 	}
147 	(void) fletcher_4_incremental_native(&dsp->dsa_drr->
148 	    drr_u.drr_checksum.drr_checksum,
149 	    sizeof (zio_cksum_t), &dsp->dsa_zc);
150 	if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
151 		return (SET_ERROR(EINTR));
152 	if (payload_len != 0) {
153 		(void) fletcher_4_incremental_native(payload, payload_len,
154 		    &dsp->dsa_zc);
155 		if (dump_bytes(dsp, payload, payload_len) != 0)
156 			return (SET_ERROR(EINTR));
157 	}
158 	return (0);
159 }
160 
161 /*
162  * Fill in the drr_free struct, or perform aggregation if the previous record is
163  * also a free record, and the two are adjacent.
164  *
165  * Note that we send free records even for a full send, because we want to be
166  * able to receive a full send as a clone, which requires a list of all the free
167  * and freeobject records that were generated on the source.
168  */
169 static int
170 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
171     uint64_t length)
172 {
173 	struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
174 
175 	/*
176 	 * When we receive a free record, dbuf_free_range() assumes
177 	 * that the receiving system doesn't have any dbufs in the range
178 	 * being freed.  This is always true because there is a one-record
179 	 * constraint: we only send one WRITE record for any given
180 	 * object,offset.  We know that the one-record constraint is
181 	 * true because we always send data in increasing order by
182 	 * object,offset.
183 	 *
184 	 * If the increasing-order constraint ever changes, we should find
185 	 * another way to assert that the one-record constraint is still
186 	 * satisfied.
187 	 */
188 	ASSERT(object > dsp->dsa_last_data_object ||
189 	    (object == dsp->dsa_last_data_object &&
190 	    offset > dsp->dsa_last_data_offset));
191 
192 	if (length != -1ULL && offset + length < offset)
193 		length = -1ULL;
194 
195 	/*
196 	 * If there is a pending op, but it's not PENDING_FREE, push it out,
197 	 * since free block aggregation can only be done for blocks of the
198 	 * same type (i.e., DRR_FREE records can only be aggregated with
199 	 * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
200 	 * aggregated with other DRR_FREEOBJECTS records.
201 	 */
202 	if (dsp->dsa_pending_op != PENDING_NONE &&
203 	    dsp->dsa_pending_op != PENDING_FREE) {
204 		if (dump_record(dsp, NULL, 0) != 0)
205 			return (SET_ERROR(EINTR));
206 		dsp->dsa_pending_op = PENDING_NONE;
207 	}
208 
209 	if (dsp->dsa_pending_op == PENDING_FREE) {
210 		/*
211 		 * There should never be a PENDING_FREE if length is -1
212 		 * (because dump_dnode is the only place where this
213 		 * function is called with a -1, and only after flushing
214 		 * any pending record).
215 		 */
216 		ASSERT(length != -1ULL);
217 		/*
218 		 * Check to see whether this free block can be aggregated
219 		 * with pending one.
220 		 */
221 		if (drrf->drr_object == object && drrf->drr_offset +
222 		    drrf->drr_length == offset) {
223 			drrf->drr_length += length;
224 			return (0);
225 		} else {
226 			/* not a continuation.  Push out pending record */
227 			if (dump_record(dsp, NULL, 0) != 0)
228 				return (SET_ERROR(EINTR));
229 			dsp->dsa_pending_op = PENDING_NONE;
230 		}
231 	}
232 	/* create a FREE record and make it pending */
233 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
234 	dsp->dsa_drr->drr_type = DRR_FREE;
235 	drrf->drr_object = object;
236 	drrf->drr_offset = offset;
237 	drrf->drr_length = length;
238 	drrf->drr_toguid = dsp->dsa_toguid;
239 	if (length == -1ULL) {
240 		if (dump_record(dsp, NULL, 0) != 0)
241 			return (SET_ERROR(EINTR));
242 	} else {
243 		dsp->dsa_pending_op = PENDING_FREE;
244 	}
245 
246 	return (0);
247 }
248 
249 static int
250 dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type,
251     uint64_t object, uint64_t offset, int lsize, int psize, const blkptr_t *bp,
252     void *data)
253 {
254 	uint64_t payload_size;
255 	struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
256 
257 	/*
258 	 * We send data in increasing object, offset order.
259 	 * See comment in dump_free() for details.
260 	 */
261 	ASSERT(object > dsp->dsa_last_data_object ||
262 	    (object == dsp->dsa_last_data_object &&
263 	    offset > dsp->dsa_last_data_offset));
264 	dsp->dsa_last_data_object = object;
265 	dsp->dsa_last_data_offset = offset + lsize - 1;
266 
267 	/*
268 	 * If there is any kind of pending aggregation (currently either
269 	 * a grouping of free objects or free blocks), push it out to
270 	 * the stream, since aggregation can't be done across operations
271 	 * of different types.
272 	 */
273 	if (dsp->dsa_pending_op != PENDING_NONE) {
274 		if (dump_record(dsp, NULL, 0) != 0)
275 			return (SET_ERROR(EINTR));
276 		dsp->dsa_pending_op = PENDING_NONE;
277 	}
278 	/* write a WRITE record */
279 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
280 	dsp->dsa_drr->drr_type = DRR_WRITE;
281 	drrw->drr_object = object;
282 	drrw->drr_type = type;
283 	drrw->drr_offset = offset;
284 	drrw->drr_toguid = dsp->dsa_toguid;
285 	drrw->drr_logical_size = lsize;
286 
287 	/* only set the compression fields if the buf is compressed */
288 	if (lsize != psize) {
289 		ASSERT(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED);
290 		ASSERT(!BP_IS_EMBEDDED(bp));
291 		ASSERT(!BP_SHOULD_BYTESWAP(bp));
292 		ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
293 		ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);
294 		ASSERT3S(psize, >, 0);
295 		ASSERT3S(lsize, >=, psize);
296 
297 		drrw->drr_compressiontype = BP_GET_COMPRESS(bp);
298 		drrw->drr_compressed_size = psize;
299 		payload_size = drrw->drr_compressed_size;
300 	} else {
301 		payload_size = drrw->drr_logical_size;
302 	}
303 
304 	if (bp == NULL || BP_IS_EMBEDDED(bp)) {
305 		/*
306 		 * There's no pre-computed checksum for partial-block
307 		 * writes or embedded BP's, so (like
308 		 * fletcher4-checkummed blocks) userland will have to
309 		 * compute a dedup-capable checksum itself.
310 		 */
311 		drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
312 	} else {
313 		drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
314 		if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
315 		    ZCHECKSUM_FLAG_DEDUP)
316 			drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP;
317 		DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
318 		DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
319 		DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
320 		drrw->drr_key.ddk_cksum = bp->blk_cksum;
321 	}
322 
323 	if (dump_record(dsp, data, payload_size) != 0)
324 		return (SET_ERROR(EINTR));
325 	return (0);
326 }
327 
328 static int
329 dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
330     int blksz, const blkptr_t *bp)
331 {
332 	char buf[BPE_PAYLOAD_SIZE];
333 	struct drr_write_embedded *drrw =
334 	    &(dsp->dsa_drr->drr_u.drr_write_embedded);
335 
336 	if (dsp->dsa_pending_op != PENDING_NONE) {
337 		if (dump_record(dsp, NULL, 0) != 0)
338 			return (EINTR);
339 		dsp->dsa_pending_op = PENDING_NONE;
340 	}
341 
342 	ASSERT(BP_IS_EMBEDDED(bp));
343 
344 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
345 	dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
346 	drrw->drr_object = object;
347 	drrw->drr_offset = offset;
348 	drrw->drr_length = blksz;
349 	drrw->drr_toguid = dsp->dsa_toguid;
350 	drrw->drr_compression = BP_GET_COMPRESS(bp);
351 	drrw->drr_etype = BPE_GET_ETYPE(bp);
352 	drrw->drr_lsize = BPE_GET_LSIZE(bp);
353 	drrw->drr_psize = BPE_GET_PSIZE(bp);
354 
355 	decode_embedded_bp_compressed(bp, buf);
356 
357 	if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
358 		return (EINTR);
359 	return (0);
360 }
361 
362 static int
363 dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data)
364 {
365 	struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
366 
367 	if (dsp->dsa_pending_op != PENDING_NONE) {
368 		if (dump_record(dsp, NULL, 0) != 0)
369 			return (SET_ERROR(EINTR));
370 		dsp->dsa_pending_op = PENDING_NONE;
371 	}
372 
373 	/* write a SPILL record */
374 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
375 	dsp->dsa_drr->drr_type = DRR_SPILL;
376 	drrs->drr_object = object;
377 	drrs->drr_length = blksz;
378 	drrs->drr_toguid = dsp->dsa_toguid;
379 
380 	if (dump_record(dsp, data, blksz) != 0)
381 		return (SET_ERROR(EINTR));
382 	return (0);
383 }
384 
385 static int
386 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
387 {
388 	struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
389 
390 	/*
391 	 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
392 	 * push it out, since free block aggregation can only be done for
393 	 * blocks of the same type (i.e., DRR_FREE records can only be
394 	 * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
395 	 * can only be aggregated with other DRR_FREEOBJECTS records.
396 	 */
397 	if (dsp->dsa_pending_op != PENDING_NONE &&
398 	    dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
399 		if (dump_record(dsp, NULL, 0) != 0)
400 			return (SET_ERROR(EINTR));
401 		dsp->dsa_pending_op = PENDING_NONE;
402 	}
403 	if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
404 		/*
405 		 * See whether this free object array can be aggregated
406 		 * with pending one
407 		 */
408 		if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
409 			drrfo->drr_numobjs += numobjs;
410 			return (0);
411 		} else {
412 			/* can't be aggregated.  Push out pending record */
413 			if (dump_record(dsp, NULL, 0) != 0)
414 				return (SET_ERROR(EINTR));
415 			dsp->dsa_pending_op = PENDING_NONE;
416 		}
417 	}
418 
419 	/* write a FREEOBJECTS record */
420 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
421 	dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
422 	drrfo->drr_firstobj = firstobj;
423 	drrfo->drr_numobjs = numobjs;
424 	drrfo->drr_toguid = dsp->dsa_toguid;
425 
426 	dsp->dsa_pending_op = PENDING_FREEOBJECTS;
427 
428 	return (0);
429 }
430 
431 static int
432 dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp)
433 {
434 	struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
435 
436 	if (object < dsp->dsa_resume_object) {
437 		/*
438 		 * Note: when resuming, we will visit all the dnodes in
439 		 * the block of dnodes that we are resuming from.  In
440 		 * this case it's unnecessary to send the dnodes prior to
441 		 * the one we are resuming from.  We should be at most one
442 		 * block's worth of dnodes behind the resume point.
443 		 */
444 		ASSERT3U(dsp->dsa_resume_object - object, <,
445 		    1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
446 		return (0);
447 	}
448 
449 	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
450 		return (dump_freeobjects(dsp, object, 1));
451 
452 	if (dsp->dsa_pending_op != PENDING_NONE) {
453 		if (dump_record(dsp, NULL, 0) != 0)
454 			return (SET_ERROR(EINTR));
455 		dsp->dsa_pending_op = PENDING_NONE;
456 	}
457 
458 	/* write an OBJECT record */
459 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
460 	dsp->dsa_drr->drr_type = DRR_OBJECT;
461 	drro->drr_object = object;
462 	drro->drr_type = dnp->dn_type;
463 	drro->drr_bonustype = dnp->dn_bonustype;
464 	drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
465 	drro->drr_bonuslen = dnp->dn_bonuslen;
466 	drro->drr_dn_slots = dnp->dn_extra_slots + 1;
467 	drro->drr_checksumtype = dnp->dn_checksum;
468 	drro->drr_compress = dnp->dn_compress;
469 	drro->drr_toguid = dsp->dsa_toguid;
470 
471 	if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
472 	    drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
473 		drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
474 
475 	if (dump_record(dsp, DN_BONUS(dnp),
476 	    P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) {
477 		return (SET_ERROR(EINTR));
478 	}
479 
480 	/* Free anything past the end of the file. */
481 	if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
482 	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0)
483 		return (SET_ERROR(EINTR));
484 	if (dsp->dsa_err != 0)
485 		return (SET_ERROR(EINTR));
486 	return (0);
487 }
488 
489 static boolean_t
490 backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
491 {
492 	if (!BP_IS_EMBEDDED(bp))
493 		return (B_FALSE);
494 
495 	/*
496 	 * Compression function must be legacy, or explicitly enabled.
497 	 */
498 	if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
499 	    !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LZ4)))
500 		return (B_FALSE);
501 
502 	/*
503 	 * Embed type must be explicitly enabled.
504 	 */
505 	switch (BPE_GET_ETYPE(bp)) {
506 	case BP_EMBEDDED_TYPE_DATA:
507 		if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
508 			return (B_TRUE);
509 		break;
510 	default:
511 		return (B_FALSE);
512 	}
513 	return (B_FALSE);
514 }
515 
516 /*
517  * This is the callback function to traverse_dataset that acts as the worker
518  * thread for dmu_send_impl.
519  */
520 /*ARGSUSED*/
521 static int
522 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
523     const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
524 {
525 	struct send_thread_arg *sta = arg;
526 	struct send_block_record *record;
527 	uint64_t record_size;
528 	int err = 0;
529 
530 	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
531 	    zb->zb_object >= sta->resume.zb_object);
532 
533 	if (sta->cancel)
534 		return (SET_ERROR(EINTR));
535 
536 	if (bp == NULL) {
537 		ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
538 		return (0);
539 	} else if (zb->zb_level < 0) {
540 		return (0);
541 	}
542 
543 	record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
544 	record->eos_marker = B_FALSE;
545 	record->bp = *bp;
546 	record->zb = *zb;
547 	record->indblkshift = dnp->dn_indblkshift;
548 	record->datablkszsec = dnp->dn_datablkszsec;
549 	record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
550 	bqueue_enqueue(&sta->q, record, record_size);
551 
552 	return (err);
553 }
554 
555 /*
556  * This function kicks off the traverse_dataset.  It also handles setting the
557  * error code of the thread in case something goes wrong, and pushes the End of
558  * Stream record when the traverse_dataset call has finished.  If there is no
559  * dataset to traverse, the thread immediately pushes End of Stream marker.
560  */
561 static void
562 send_traverse_thread(void *arg)
563 {
564 	struct send_thread_arg *st_arg = arg;
565 	int err;
566 	struct send_block_record *data;
567 
568 	if (st_arg->ds != NULL) {
569 		err = traverse_dataset_resume(st_arg->ds,
570 		    st_arg->fromtxg, &st_arg->resume,
571 		    st_arg->flags, send_cb, st_arg);
572 
573 		if (err != EINTR)
574 			st_arg->error_code = err;
575 	}
576 	data = kmem_zalloc(sizeof (*data), KM_SLEEP);
577 	data->eos_marker = B_TRUE;
578 	bqueue_enqueue(&st_arg->q, data, 1);
579 	thread_exit();
580 }
581 
582 /*
583  * This function actually handles figuring out what kind of record needs to be
584  * dumped, reading the data (which has hopefully been prefetched), and calling
585  * the appropriate helper function.
586  */
587 static int
588 do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
589 {
590 	dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
591 	const blkptr_t *bp = &data->bp;
592 	const zbookmark_phys_t *zb = &data->zb;
593 	uint8_t indblkshift = data->indblkshift;
594 	uint16_t dblkszsec = data->datablkszsec;
595 	spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
596 	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
597 	int err = 0;
598 
599 	ASSERT3U(zb->zb_level, >=, 0);
600 
601 	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
602 	    zb->zb_object >= dsa->dsa_resume_object);
603 
604 	if (zb->zb_object != DMU_META_DNODE_OBJECT &&
605 	    DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
606 		return (0);
607 	} else if (BP_IS_HOLE(bp) &&
608 	    zb->zb_object == DMU_META_DNODE_OBJECT) {
609 		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
610 		uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
611 		err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
612 	} else if (BP_IS_HOLE(bp)) {
613 		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
614 		uint64_t offset = zb->zb_blkid * span;
615 		err = dump_free(dsa, zb->zb_object, offset, span);
616 	} else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
617 		return (0);
618 	} else if (type == DMU_OT_DNODE) {
619 		int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
620 		arc_flags_t aflags = ARC_FLAG_WAIT;
621 		arc_buf_t *abuf;
622 
623 		ASSERT0(zb->zb_level);
624 
625 		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
626 		    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
627 		    &aflags, zb) != 0)
628 			return (SET_ERROR(EIO));
629 
630 		dnode_phys_t *blk = abuf->b_data;
631 		uint64_t dnobj = zb->zb_blkid * epb;
632 		for (int i = 0; i < epb; i += blk[i].dn_extra_slots + 1) {
633 			err = dump_dnode(dsa, dnobj + i, blk + i);
634 			if (err != 0)
635 				break;
636 		}
637 		arc_buf_destroy(abuf, &abuf);
638 	} else if (type == DMU_OT_SA) {
639 		arc_flags_t aflags = ARC_FLAG_WAIT;
640 		arc_buf_t *abuf;
641 		int blksz = BP_GET_LSIZE(bp);
642 
643 		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
644 		    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
645 		    &aflags, zb) != 0)
646 			return (SET_ERROR(EIO));
647 
648 		err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data);
649 		arc_buf_destroy(abuf, &abuf);
650 	} else if (backup_do_embed(dsa, bp)) {
651 		/* it's an embedded level-0 block of a regular object */
652 		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
653 		ASSERT0(zb->zb_level);
654 		err = dump_write_embedded(dsa, zb->zb_object,
655 		    zb->zb_blkid * blksz, blksz, bp);
656 	} else {
657 		/* it's a level-0 block of a regular object */
658 		arc_flags_t aflags = ARC_FLAG_WAIT;
659 		arc_buf_t *abuf;
660 		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
661 		uint64_t offset;
662 
663 		/*
664 		 * If we have large blocks stored on disk but the send flags
665 		 * don't allow us to send large blocks, we split the data from
666 		 * the arc buf into chunks.
667 		 */
668 		boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE &&
669 		    !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
670 		/*
671 		 * We should only request compressed data from the ARC if all
672 		 * the following are true:
673 		 *  - stream compression was requested
674 		 *  - we aren't splitting large blocks into smaller chunks
675 		 *  - the data won't need to be byteswapped before sending
676 		 *  - this isn't an embedded block
677 		 *  - this isn't metadata (if receiving on a different endian
678 		 *    system it can be byteswapped more easily)
679 		 */
680 		boolean_t request_compressed =
681 		    (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
682 		    !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
683 		    !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
684 
685 		ASSERT0(zb->zb_level);
686 		ASSERT(zb->zb_object > dsa->dsa_resume_object ||
687 		    (zb->zb_object == dsa->dsa_resume_object &&
688 		    zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
689 
690 		ASSERT0(zb->zb_level);
691 		ASSERT(zb->zb_object > dsa->dsa_resume_object ||
692 		    (zb->zb_object == dsa->dsa_resume_object &&
693 		    zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
694 
695 		ASSERT3U(blksz, ==, BP_GET_LSIZE(bp));
696 
697 		enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
698 		if (request_compressed)
699 			zioflags |= ZIO_FLAG_RAW;
700 		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
701 		    ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) {
702 			if (zfs_send_corrupt_data) {
703 				/* Send a block filled with 0x"zfs badd bloc" */
704 				abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
705 				    blksz);
706 				uint64_t *ptr;
707 				for (ptr = abuf->b_data;
708 				    (char *)ptr < (char *)abuf->b_data + blksz;
709 				    ptr++)
710 					*ptr = 0x2f5baddb10cULL;
711 			} else {
712 				return (SET_ERROR(EIO));
713 			}
714 		}
715 
716 		offset = zb->zb_blkid * blksz;
717 
718 		if (split_large_blocks) {
719 			ASSERT3U(arc_get_compression(abuf), ==,
720 			    ZIO_COMPRESS_OFF);
721 			char *buf = abuf->b_data;
722 			while (blksz > 0 && err == 0) {
723 				int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
724 				err = dump_write(dsa, type, zb->zb_object,
725 				    offset, n, n, NULL, buf);
726 				offset += n;
727 				buf += n;
728 				blksz -= n;
729 			}
730 		} else {
731 			err = dump_write(dsa, type, zb->zb_object, offset,
732 			    blksz, arc_buf_size(abuf), bp, abuf->b_data);
733 		}
734 		arc_buf_destroy(abuf, &abuf);
735 	}
736 
737 	ASSERT(err == 0 || err == EINTR);
738 	return (err);
739 }
740 
741 /*
742  * Pop the new data off the queue, and free the old data.
743  */
744 static struct send_block_record *
745 get_next_record(bqueue_t *bq, struct send_block_record *data)
746 {
747 	struct send_block_record *tmp = bqueue_dequeue(bq);
748 	kmem_free(data, sizeof (*data));
749 	return (tmp);
750 }
751 
752 /*
753  * Actually do the bulk of the work in a zfs send.
754  *
755  * Note: Releases dp using the specified tag.
756  */
757 static int
758 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
759     zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone,
760     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
761     int outfd, uint64_t resumeobj, uint64_t resumeoff,
762     vnode_t *vp, offset_t *off)
763 {
764 	objset_t *os;
765 	dmu_replay_record_t *drr;
766 	dmu_sendarg_t *dsp;
767 	int err;
768 	uint64_t fromtxg = 0;
769 	uint64_t featureflags = 0;
770 	struct send_thread_arg to_arg = { 0 };
771 
772 	err = dmu_objset_from_ds(to_ds, &os);
773 	if (err != 0) {
774 		dsl_pool_rele(dp, tag);
775 		return (err);
776 	}
777 
778 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
779 	drr->drr_type = DRR_BEGIN;
780 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
781 	DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
782 	    DMU_SUBSTREAM);
783 
784 #ifdef _KERNEL
785 	if (dmu_objset_type(os) == DMU_OST_ZFS) {
786 		uint64_t version;
787 		if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
788 			kmem_free(drr, sizeof (dmu_replay_record_t));
789 			dsl_pool_rele(dp, tag);
790 			return (SET_ERROR(EINVAL));
791 		}
792 		if (version >= ZPL_VERSION_SA) {
793 			featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
794 		}
795 	}
796 #endif
797 
798 	if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
799 		featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
800 	if (to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_DNODE])
801 		featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
802 	if (embedok &&
803 	    spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
804 		featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
805 		if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
806 			featureflags |= DMU_BACKUP_FEATURE_LZ4;
807 	}
808 	if (compressok) {
809 		featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
810 	}
811 	if ((featureflags &
812 	    (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED)) !=
813 	    0 && spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
814 		featureflags |= DMU_BACKUP_FEATURE_LZ4;
815 	}
816 
817 	if (resumeobj != 0 || resumeoff != 0) {
818 		featureflags |= DMU_BACKUP_FEATURE_RESUMING;
819 	}
820 
821 	DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
822 	    featureflags);
823 
824 	drr->drr_u.drr_begin.drr_creation_time =
825 	    dsl_dataset_phys(to_ds)->ds_creation_time;
826 	drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
827 	if (is_clone)
828 		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
829 	drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
830 	if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
831 		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
832 	if (zfs_send_set_freerecords_bit)
833 		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
834 
835 	if (ancestor_zb != NULL) {
836 		drr->drr_u.drr_begin.drr_fromguid =
837 		    ancestor_zb->zbm_guid;
838 		fromtxg = ancestor_zb->zbm_creation_txg;
839 	}
840 	dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
841 	if (!to_ds->ds_is_snapshot) {
842 		(void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
843 		    sizeof (drr->drr_u.drr_begin.drr_toname));
844 	}
845 
846 	dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
847 
848 	dsp->dsa_drr = drr;
849 	dsp->dsa_vp = vp;
850 	dsp->dsa_outfd = outfd;
851 	dsp->dsa_proc = curproc;
852 	dsp->dsa_os = os;
853 	dsp->dsa_off = off;
854 	dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
855 	dsp->dsa_pending_op = PENDING_NONE;
856 	dsp->dsa_featureflags = featureflags;
857 	dsp->dsa_resume_object = resumeobj;
858 	dsp->dsa_resume_offset = resumeoff;
859 
860 	mutex_enter(&to_ds->ds_sendstream_lock);
861 	list_insert_head(&to_ds->ds_sendstreams, dsp);
862 	mutex_exit(&to_ds->ds_sendstream_lock);
863 
864 	dsl_dataset_long_hold(to_ds, FTAG);
865 	dsl_pool_rele(dp, tag);
866 
867 	void *payload = NULL;
868 	size_t payload_len = 0;
869 	if (resumeobj != 0 || resumeoff != 0) {
870 		dmu_object_info_t to_doi;
871 		err = dmu_object_info(os, resumeobj, &to_doi);
872 		if (err != 0)
873 			goto out;
874 		SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0,
875 		    resumeoff / to_doi.doi_data_block_size);
876 
877 		nvlist_t *nvl = fnvlist_alloc();
878 		fnvlist_add_uint64(nvl, "resume_object", resumeobj);
879 		fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
880 		payload = fnvlist_pack(nvl, &payload_len);
881 		drr->drr_payloadlen = payload_len;
882 		fnvlist_free(nvl);
883 	}
884 
885 	err = dump_record(dsp, payload, payload_len);
886 	fnvlist_pack_free(payload, payload_len);
887 	if (err != 0) {
888 		err = dsp->dsa_err;
889 		goto out;
890 	}
891 
892 	err = bqueue_init(&to_arg.q, zfs_send_queue_length,
893 	    offsetof(struct send_block_record, ln));
894 	to_arg.error_code = 0;
895 	to_arg.cancel = B_FALSE;
896 	to_arg.ds = to_ds;
897 	to_arg.fromtxg = fromtxg;
898 	to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
899 	(void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
900 	    TS_RUN, minclsyspri);
901 
902 	struct send_block_record *to_data;
903 	to_data = bqueue_dequeue(&to_arg.q);
904 
905 	while (!to_data->eos_marker && err == 0) {
906 		err = do_dump(dsp, to_data);
907 		to_data = get_next_record(&to_arg.q, to_data);
908 		if (issig(JUSTLOOKING) && issig(FORREAL))
909 			err = EINTR;
910 	}
911 
912 	if (err != 0) {
913 		to_arg.cancel = B_TRUE;
914 		while (!to_data->eos_marker) {
915 			to_data = get_next_record(&to_arg.q, to_data);
916 		}
917 	}
918 	kmem_free(to_data, sizeof (*to_data));
919 
920 	bqueue_destroy(&to_arg.q);
921 
922 	if (err == 0 && to_arg.error_code != 0)
923 		err = to_arg.error_code;
924 
925 	if (err != 0)
926 		goto out;
927 
928 	if (dsp->dsa_pending_op != PENDING_NONE)
929 		if (dump_record(dsp, NULL, 0) != 0)
930 			err = SET_ERROR(EINTR);
931 
932 	if (err != 0) {
933 		if (err == EINTR && dsp->dsa_err != 0)
934 			err = dsp->dsa_err;
935 		goto out;
936 	}
937 
938 	bzero(drr, sizeof (dmu_replay_record_t));
939 	drr->drr_type = DRR_END;
940 	drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
941 	drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
942 
943 	if (dump_record(dsp, NULL, 0) != 0)
944 		err = dsp->dsa_err;
945 
946 out:
947 	mutex_enter(&to_ds->ds_sendstream_lock);
948 	list_remove(&to_ds->ds_sendstreams, dsp);
949 	mutex_exit(&to_ds->ds_sendstream_lock);
950 
951 	VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end));
952 
953 	kmem_free(drr, sizeof (dmu_replay_record_t));
954 	kmem_free(dsp, sizeof (dmu_sendarg_t));
955 
956 	dsl_dataset_long_rele(to_ds, FTAG);
957 
958 	return (err);
959 }
960 
961 int
962 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
963     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
964     int outfd, vnode_t *vp, offset_t *off)
965 {
966 	dsl_pool_t *dp;
967 	dsl_dataset_t *ds;
968 	dsl_dataset_t *fromds = NULL;
969 	int err;
970 
971 	err = dsl_pool_hold(pool, FTAG, &dp);
972 	if (err != 0)
973 		return (err);
974 
975 	err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
976 	if (err != 0) {
977 		dsl_pool_rele(dp, FTAG);
978 		return (err);
979 	}
980 
981 	if (fromsnap != 0) {
982 		zfs_bookmark_phys_t zb;
983 		boolean_t is_clone;
984 
985 		err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
986 		if (err != 0) {
987 			dsl_dataset_rele(ds, FTAG);
988 			dsl_pool_rele(dp, FTAG);
989 			return (err);
990 		}
991 		if (!dsl_dataset_is_before(ds, fromds, 0))
992 			err = SET_ERROR(EXDEV);
993 		zb.zbm_creation_time =
994 		    dsl_dataset_phys(fromds)->ds_creation_time;
995 		zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
996 		zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
997 		is_clone = (fromds->ds_dir != ds->ds_dir);
998 		dsl_dataset_rele(fromds, FTAG);
999 		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1000 		    embedok, large_block_ok, compressok, outfd, 0, 0, vp, off);
1001 	} else {
1002 		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1003 		    embedok, large_block_ok, compressok, outfd, 0, 0, vp, off);
1004 	}
1005 	dsl_dataset_rele(ds, FTAG);
1006 	return (err);
1007 }
1008 
1009 int
1010 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
1011     boolean_t large_block_ok, boolean_t compressok, int outfd,
1012     uint64_t resumeobj, uint64_t resumeoff,
1013     vnode_t *vp, offset_t *off)
1014 {
1015 	dsl_pool_t *dp;
1016 	dsl_dataset_t *ds;
1017 	int err;
1018 	boolean_t owned = B_FALSE;
1019 
1020 	if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
1021 		return (SET_ERROR(EINVAL));
1022 
1023 	err = dsl_pool_hold(tosnap, FTAG, &dp);
1024 	if (err != 0)
1025 		return (err);
1026 
1027 	if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
1028 		/*
1029 		 * We are sending a filesystem or volume.  Ensure
1030 		 * that it doesn't change by owning the dataset.
1031 		 */
1032 		err = dsl_dataset_own(dp, tosnap, FTAG, &ds);
1033 		owned = B_TRUE;
1034 	} else {
1035 		err = dsl_dataset_hold(dp, tosnap, FTAG, &ds);
1036 	}
1037 	if (err != 0) {
1038 		dsl_pool_rele(dp, FTAG);
1039 		return (err);
1040 	}
1041 
1042 	if (fromsnap != NULL) {
1043 		zfs_bookmark_phys_t zb;
1044 		boolean_t is_clone = B_FALSE;
1045 		int fsnamelen = strchr(tosnap, '@') - tosnap;
1046 
1047 		/*
1048 		 * If the fromsnap is in a different filesystem, then
1049 		 * mark the send stream as a clone.
1050 		 */
1051 		if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
1052 		    (fromsnap[fsnamelen] != '@' &&
1053 		    fromsnap[fsnamelen] != '#')) {
1054 			is_clone = B_TRUE;
1055 		}
1056 
1057 		if (strchr(fromsnap, '@')) {
1058 			dsl_dataset_t *fromds;
1059 			err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
1060 			if (err == 0) {
1061 				if (!dsl_dataset_is_before(ds, fromds, 0))
1062 					err = SET_ERROR(EXDEV);
1063 				zb.zbm_creation_time =
1064 				    dsl_dataset_phys(fromds)->ds_creation_time;
1065 				zb.zbm_creation_txg =
1066 				    dsl_dataset_phys(fromds)->ds_creation_txg;
1067 				zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1068 				is_clone = (ds->ds_dir != fromds->ds_dir);
1069 				dsl_dataset_rele(fromds, FTAG);
1070 			}
1071 		} else {
1072 			err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1073 		}
1074 		if (err != 0) {
1075 			dsl_dataset_rele(ds, FTAG);
1076 			dsl_pool_rele(dp, FTAG);
1077 			return (err);
1078 		}
1079 		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1080 		    embedok, large_block_ok, compressok,
1081 		    outfd, resumeobj, resumeoff, vp, off);
1082 	} else {
1083 		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1084 		    embedok, large_block_ok, compressok,
1085 		    outfd, resumeobj, resumeoff, vp, off);
1086 	}
1087 	if (owned)
1088 		dsl_dataset_disown(ds, FTAG);
1089 	else
1090 		dsl_dataset_rele(ds, FTAG);
1091 	return (err);
1092 }
1093 
1094 static int
1095 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
1096     uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
1097 {
1098 	int err = 0;
1099 	uint64_t size;
1100 	/*
1101 	 * Assume that space (both on-disk and in-stream) is dominated by
1102 	 * data.  We will adjust for indirect blocks and the copies property,
1103 	 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1104 	 */
1105 	uint64_t recordsize;
1106 	uint64_t record_count;
1107 	objset_t *os;
1108 	VERIFY0(dmu_objset_from_ds(ds, &os));
1109 
1110 	/* Assume all (uncompressed) blocks are recordsize. */
1111 	if (zfs_override_estimate_recordsize != 0) {
1112 		recordsize = zfs_override_estimate_recordsize;
1113 	} else if (os->os_phys->os_type == DMU_OST_ZVOL) {
1114 		err = dsl_prop_get_int_ds(ds,
1115 		    zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);
1116 	} else {
1117 		err = dsl_prop_get_int_ds(ds,
1118 		    zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);
1119 	}
1120 	if (err != 0)
1121 		return (err);
1122 	record_count = uncompressed / recordsize;
1123 
1124 	/*
1125 	 * If we're estimating a send size for a compressed stream, use the
1126 	 * compressed data size to estimate the stream size. Otherwise, use the
1127 	 * uncompressed data size.
1128 	 */
1129 	size = stream_compressed ? compressed : uncompressed;
1130 
1131 	/*
1132 	 * Subtract out approximate space used by indirect blocks.
1133 	 * Assume most space is used by data blocks (non-indirect, non-dnode).
1134 	 * Assume no ditto blocks or internal fragmentation.
1135 	 *
1136 	 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1137 	 * block.
1138 	 */
1139 	size -= record_count * sizeof (blkptr_t);
1140 
1141 	/* Add in the space for the record associated with each block. */
1142 	size += record_count * sizeof (dmu_replay_record_t);
1143 
1144 	*sizep = size;
1145 
1146 	return (0);
1147 }
1148 
1149 int
1150 dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds,
1151     boolean_t stream_compressed, uint64_t *sizep)
1152 {
1153 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1154 	int err;
1155 	uint64_t uncomp, comp;
1156 
1157 	ASSERT(dsl_pool_config_held(dp));
1158 
1159 	/* tosnap must be a snapshot */
1160 	if (!ds->ds_is_snapshot)
1161 		return (SET_ERROR(EINVAL));
1162 
1163 	/* fromsnap, if provided, must be a snapshot */
1164 	if (fromds != NULL && !fromds->ds_is_snapshot)
1165 		return (SET_ERROR(EINVAL));
1166 
1167 	/*
1168 	 * fromsnap must be an earlier snapshot from the same fs as tosnap,
1169 	 * or the origin's fs.
1170 	 */
1171 	if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1172 		return (SET_ERROR(EXDEV));
1173 
1174 	/* Get compressed and uncompressed size estimates of changed data. */
1175 	if (fromds == NULL) {
1176 		uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1177 		comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
1178 	} else {
1179 		uint64_t used;
1180 		err = dsl_dataset_space_written(fromds, ds,
1181 		    &used, &comp, &uncomp);
1182 		if (err != 0)
1183 			return (err);
1184 	}
1185 
1186 	err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
1187 	    stream_compressed, sizep);
1188 	/*
1189 	 * Add the size of the BEGIN and END records to the estimate.
1190 	 */
1191 	*sizep += 2 * sizeof (dmu_replay_record_t);
1192 	return (err);
1193 }
1194 
1195 struct calculate_send_arg {
1196 	uint64_t uncompressed;
1197 	uint64_t compressed;
1198 };
1199 
1200 /*
1201  * Simple callback used to traverse the blocks of a snapshot and sum their
1202  * uncompressed and compressed sizes.
1203  */
1204 /* ARGSUSED */
1205 static int
1206 dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1207     const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1208 {
1209 	struct calculate_send_arg *space = arg;
1210 	if (bp != NULL && !BP_IS_HOLE(bp)) {
1211 		space->uncompressed += BP_GET_UCSIZE(bp);
1212 		space->compressed += BP_GET_PSIZE(bp);
1213 	}
1214 	return (0);
1215 }
1216 
1217 /*
1218  * Given a desination snapshot and a TXG, calculate the approximate size of a
1219  * send stream sent from that TXG. from_txg may be zero, indicating that the
1220  * whole snapshot will be sent.
1221  */
1222 int
1223 dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1224     boolean_t stream_compressed, uint64_t *sizep)
1225 {
1226 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1227 	int err;
1228 	struct calculate_send_arg size = { 0 };
1229 
1230 	ASSERT(dsl_pool_config_held(dp));
1231 
1232 	/* tosnap must be a snapshot */
1233 	if (!ds->ds_is_snapshot)
1234 		return (SET_ERROR(EINVAL));
1235 
1236 	/* verify that from_txg is before the provided snapshot was taken */
1237 	if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1238 		return (SET_ERROR(EXDEV));
1239 	}
1240 
1241 	/*
1242 	 * traverse the blocks of the snapshot with birth times after
1243 	 * from_txg, summing their uncompressed size
1244 	 */
1245 	err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1246 	    dmu_calculate_send_traversal, &size);
1247 	if (err)
1248 		return (err);
1249 
1250 	err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed,
1251 	    size.compressed, stream_compressed, sizep);
1252 	return (err);
1253 }
1254