xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu_send.c (revision 6ccda740e007c01cb5d1436fe337851ff8c5d422)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26  * Copyright 2014 HybridCluster. All rights reserved.
27  * Copyright 2016 RackTop Systems.
28  * Copyright (c) 2014 Integros [integros.com]
29  */
30 
31 #include <sys/dmu.h>
32 #include <sys/dmu_impl.h>
33 #include <sys/dmu_tx.h>
34 #include <sys/dbuf.h>
35 #include <sys/dnode.h>
36 #include <sys/zfs_context.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/dmu_traverse.h>
39 #include <sys/dsl_dataset.h>
40 #include <sys/dsl_dir.h>
41 #include <sys/dsl_prop.h>
42 #include <sys/dsl_pool.h>
43 #include <sys/dsl_synctask.h>
44 #include <sys/zfs_ioctl.h>
45 #include <sys/zap.h>
46 #include <sys/zio_checksum.h>
47 #include <sys/zfs_znode.h>
48 #include <zfs_fletcher.h>
49 #include <sys/avl.h>
50 #include <sys/ddt.h>
51 #include <sys/zfs_onexit.h>
52 #include <sys/dmu_send.h>
53 #include <sys/dsl_destroy.h>
54 #include <sys/blkptr.h>
55 #include <sys/dsl_bookmark.h>
56 #include <sys/zfeature.h>
57 #include <sys/bqueue.h>
58 
59 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
60 int zfs_send_corrupt_data = B_FALSE;
61 int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
62 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
63 int zfs_send_set_freerecords_bit = B_TRUE;
64 /* Set this tunable to FALSE is disable sending unmodified spill blocks. */
65 int zfs_send_unmodified_spill_blocks = B_TRUE;
66 
67 /*
68  * Use this to override the recordsize calculation for fast zfs send estimates.
69  */
70 uint64_t zfs_override_estimate_recordsize = 0;
71 
72 #define	BP_SPAN(datablkszsec, indblkshift, level) \
73 	(((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
74 	(level) * (indblkshift - SPA_BLKPTRSHIFT)))
75 
76 struct send_thread_arg {
77 	bqueue_t	q;
78 	dsl_dataset_t	*ds;		/* Dataset to traverse */
79 	uint64_t	fromtxg;	/* Traverse from this txg */
80 	int		flags;		/* flags to pass to traverse_dataset */
81 	int		error_code;
82 	boolean_t	cancel;
83 	zbookmark_phys_t resume;
84 };
85 
86 struct send_block_record {
87 	boolean_t		eos_marker; /* Marks the end of the stream */
88 	blkptr_t		bp;
89 	zbookmark_phys_t	zb;
90 	uint8_t			indblkshift;
91 	uint16_t		datablkszsec;
92 	bqueue_node_t		ln;
93 };
94 
95 static int do_dump(dmu_sendarg_t *dsa, struct send_block_record *data);
96 
97 static int
98 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
99 {
100 	dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
101 	ssize_t resid; /* have to get resid to get detailed errno */
102 
103 	/*
104 	 * The code does not rely on len being a multiple of 8.  We keep
105 	 * this assertion because of the corresponding assertion in
106 	 * receive_read().  Keeping this assertion ensures that we do not
107 	 * inadvertently break backwards compatibility (causing the assertion
108 	 * in receive_read() to trigger on old software). Newer feature flags
109 	 * (such as raw send) may break this assertion since they were
110 	 * introduced after the requirement was made obsolete.
111 	 */
112 
113 	ASSERT(len % 8 == 0 ||
114 	    (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
115 
116 	dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
117 	    (caddr_t)buf, len,
118 	    0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
119 
120 	mutex_enter(&ds->ds_sendstream_lock);
121 	*dsp->dsa_off += len;
122 	mutex_exit(&ds->ds_sendstream_lock);
123 
124 	return (dsp->dsa_err);
125 }
126 
127 /*
128  * For all record types except BEGIN, fill in the checksum (overlaid in
129  * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
130  * up to the start of the checksum itself.
131  */
132 static int
133 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
134 {
135 	ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
136 	    ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
137 	(void) fletcher_4_incremental_native(dsp->dsa_drr,
138 	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
139 	    &dsp->dsa_zc);
140 	if (dsp->dsa_drr->drr_type == DRR_BEGIN) {
141 		dsp->dsa_sent_begin = B_TRUE;
142 	} else {
143 		ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
144 		    drr_checksum.drr_checksum));
145 		dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
146 	}
147 	if (dsp->dsa_drr->drr_type == DRR_END) {
148 		dsp->dsa_sent_end = B_TRUE;
149 	}
150 	(void) fletcher_4_incremental_native(&dsp->dsa_drr->
151 	    drr_u.drr_checksum.drr_checksum,
152 	    sizeof (zio_cksum_t), &dsp->dsa_zc);
153 	if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
154 		return (SET_ERROR(EINTR));
155 	if (payload_len != 0) {
156 		(void) fletcher_4_incremental_native(payload, payload_len,
157 		    &dsp->dsa_zc);
158 		if (dump_bytes(dsp, payload, payload_len) != 0)
159 			return (SET_ERROR(EINTR));
160 	}
161 	return (0);
162 }
163 
164 /*
165  * Fill in the drr_free struct, or perform aggregation if the previous record is
166  * also a free record, and the two are adjacent.
167  *
168  * Note that we send free records even for a full send, because we want to be
169  * able to receive a full send as a clone, which requires a list of all the free
170  * and freeobject records that were generated on the source.
171  */
172 static int
173 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
174     uint64_t length)
175 {
176 	struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
177 
178 	/*
179 	 * When we receive a free record, dbuf_free_range() assumes
180 	 * that the receiving system doesn't have any dbufs in the range
181 	 * being freed.  This is always true because there is a one-record
182 	 * constraint: we only send one WRITE record for any given
183 	 * object,offset.  We know that the one-record constraint is
184 	 * true because we always send data in increasing order by
185 	 * object,offset.
186 	 *
187 	 * If the increasing-order constraint ever changes, we should find
188 	 * another way to assert that the one-record constraint is still
189 	 * satisfied.
190 	 */
191 	ASSERT(object > dsp->dsa_last_data_object ||
192 	    (object == dsp->dsa_last_data_object &&
193 	    offset > dsp->dsa_last_data_offset));
194 
195 	/*
196 	 * If there is a pending op, but it's not PENDING_FREE, push it out,
197 	 * since free block aggregation can only be done for blocks of the
198 	 * same type (i.e., DRR_FREE records can only be aggregated with
199 	 * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
200 	 * aggregated with other DRR_FREEOBJECTS records.
201 	 */
202 	if (dsp->dsa_pending_op != PENDING_NONE &&
203 	    dsp->dsa_pending_op != PENDING_FREE) {
204 		if (dump_record(dsp, NULL, 0) != 0)
205 			return (SET_ERROR(EINTR));
206 		dsp->dsa_pending_op = PENDING_NONE;
207 	}
208 
209 	if (dsp->dsa_pending_op == PENDING_FREE) {
210 		/*
211 		 * There should never be a PENDING_FREE if length is
212 		 * DMU_OBJECT_END (because dump_dnode is the only place where
213 		 * this function is called with a DMU_OBJECT_END, and only after
214 		 * flushing any pending record).
215 		 */
216 		ASSERT(length != DMU_OBJECT_END);
217 		/*
218 		 * Check to see whether this free block can be aggregated
219 		 * with pending one.
220 		 */
221 		if (drrf->drr_object == object && drrf->drr_offset +
222 		    drrf->drr_length == offset) {
223 			if (offset + length < offset)
224 				drrf->drr_length = DMU_OBJECT_END;
225 			else
226 				drrf->drr_length += length;
227 			return (0);
228 		} else {
229 			/* not a continuation.  Push out pending record */
230 			if (dump_record(dsp, NULL, 0) != 0)
231 				return (SET_ERROR(EINTR));
232 			dsp->dsa_pending_op = PENDING_NONE;
233 		}
234 	}
235 	/* create a FREE record and make it pending */
236 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
237 	dsp->dsa_drr->drr_type = DRR_FREE;
238 	drrf->drr_object = object;
239 	drrf->drr_offset = offset;
240 	if (offset + length < offset)
241 		drrf->drr_length = DMU_OBJECT_END;
242 	else
243 		drrf->drr_length = length;
244 	drrf->drr_toguid = dsp->dsa_toguid;
245 	if (length == DMU_OBJECT_END) {
246 		if (dump_record(dsp, NULL, 0) != 0)
247 			return (SET_ERROR(EINTR));
248 	} else {
249 		dsp->dsa_pending_op = PENDING_FREE;
250 	}
251 
252 	return (0);
253 }
254 
255 static int
256 dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, uint64_t object,
257     uint64_t offset, int lsize, int psize, const blkptr_t *bp, void *data)
258 {
259 	uint64_t payload_size;
260 	boolean_t raw = (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW);
261 	struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
262 
263 	/*
264 	 * We send data in increasing object, offset order.
265 	 * See comment in dump_free() for details.
266 	 */
267 	ASSERT(object > dsp->dsa_last_data_object ||
268 	    (object == dsp->dsa_last_data_object &&
269 	    offset > dsp->dsa_last_data_offset));
270 	dsp->dsa_last_data_object = object;
271 	dsp->dsa_last_data_offset = offset + lsize - 1;
272 
273 	/*
274 	 * If there is any kind of pending aggregation (currently either
275 	 * a grouping of free objects or free blocks), push it out to
276 	 * the stream, since aggregation can't be done across operations
277 	 * of different types.
278 	 */
279 	if (dsp->dsa_pending_op != PENDING_NONE) {
280 		if (dump_record(dsp, NULL, 0) != 0)
281 			return (SET_ERROR(EINTR));
282 		dsp->dsa_pending_op = PENDING_NONE;
283 	}
284 	/* write a WRITE record */
285 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
286 	dsp->dsa_drr->drr_type = DRR_WRITE;
287 	drrw->drr_object = object;
288 	drrw->drr_type = type;
289 	drrw->drr_offset = offset;
290 	drrw->drr_toguid = dsp->dsa_toguid;
291 	drrw->drr_logical_size = lsize;
292 
293 	/* only set the compression fields if the buf is compressed or raw */
294 	if (raw || lsize != psize) {
295 		ASSERT(!BP_IS_EMBEDDED(bp));
296 		ASSERT3S(psize, >, 0);
297 
298 		if (raw) {
299 			ASSERT(BP_IS_PROTECTED(bp));
300 
301 			/*
302 			 * This is a raw protected block so we need to pass
303 			 * along everything the receiving side will need to
304 			 * interpret this block, including the byteswap, salt,
305 			 * IV, and MAC.
306 			 */
307 			if (BP_SHOULD_BYTESWAP(bp))
308 				drrw->drr_flags |= DRR_RAW_BYTESWAP;
309 			zio_crypt_decode_params_bp(bp, drrw->drr_salt,
310 			    drrw->drr_iv);
311 			zio_crypt_decode_mac_bp(bp, drrw->drr_mac);
312 		} else {
313 			/* this is a compressed block */
314 			ASSERT(dsp->dsa_featureflags &
315 			    DMU_BACKUP_FEATURE_COMPRESSED);
316 			ASSERT(!BP_SHOULD_BYTESWAP(bp));
317 			ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
318 			ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);
319 			ASSERT3S(lsize, >=, psize);
320 		}
321 
322 		/* set fields common to compressed and raw sends */
323 		drrw->drr_compressiontype = BP_GET_COMPRESS(bp);
324 		drrw->drr_compressed_size = psize;
325 		payload_size = drrw->drr_compressed_size;
326 	} else {
327 		payload_size = drrw->drr_logical_size;
328 	}
329 
330 	if (bp == NULL || BP_IS_EMBEDDED(bp) || (BP_IS_PROTECTED(bp) && !raw)) {
331 		/*
332 		 * There's no pre-computed checksum for partial-block writes,
333 		 * embedded BP's, or encrypted BP's that are being sent as
334 		 * plaintext, so (like fletcher4-checkummed blocks) userland
335 		 * will have to compute a dedup-capable checksum itself.
336 		 */
337 		drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
338 	} else {
339 		drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
340 		if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
341 		    ZCHECKSUM_FLAG_DEDUP)
342 			drrw->drr_flags |= DRR_CHECKSUM_DEDUP;
343 		DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
344 		DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
345 		DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
346 		DDK_SET_CRYPT(&drrw->drr_key, BP_IS_PROTECTED(bp));
347 		drrw->drr_key.ddk_cksum = bp->blk_cksum;
348 	}
349 
350 	if (dump_record(dsp, data, payload_size) != 0)
351 		return (SET_ERROR(EINTR));
352 	return (0);
353 }
354 
355 static int
356 dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
357     int blksz, const blkptr_t *bp)
358 {
359 	char buf[BPE_PAYLOAD_SIZE];
360 	struct drr_write_embedded *drrw =
361 	    &(dsp->dsa_drr->drr_u.drr_write_embedded);
362 
363 	if (dsp->dsa_pending_op != PENDING_NONE) {
364 		if (dump_record(dsp, NULL, 0) != 0)
365 			return (EINTR);
366 		dsp->dsa_pending_op = PENDING_NONE;
367 	}
368 
369 	ASSERT(BP_IS_EMBEDDED(bp));
370 
371 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
372 	dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
373 	drrw->drr_object = object;
374 	drrw->drr_offset = offset;
375 	drrw->drr_length = blksz;
376 	drrw->drr_toguid = dsp->dsa_toguid;
377 	drrw->drr_compression = BP_GET_COMPRESS(bp);
378 	drrw->drr_etype = BPE_GET_ETYPE(bp);
379 	drrw->drr_lsize = BPE_GET_LSIZE(bp);
380 	drrw->drr_psize = BPE_GET_PSIZE(bp);
381 
382 	decode_embedded_bp_compressed(bp, buf);
383 
384 	if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
385 		return (EINTR);
386 	return (0);
387 }
388 
389 static int
390 dump_spill(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object, void *data)
391 {
392 	struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
393 	uint64_t blksz = BP_GET_LSIZE(bp);
394 	uint64_t payload_size = blksz;
395 
396 	if (dsp->dsa_pending_op != PENDING_NONE) {
397 		if (dump_record(dsp, NULL, 0) != 0)
398 			return (SET_ERROR(EINTR));
399 		dsp->dsa_pending_op = PENDING_NONE;
400 	}
401 
402 	/* write a SPILL record */
403 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
404 	dsp->dsa_drr->drr_type = DRR_SPILL;
405 	drrs->drr_object = object;
406 	drrs->drr_length = blksz;
407 	drrs->drr_toguid = dsp->dsa_toguid;
408 
409 	/* See comment in dump_dnode() for full details */
410 	if (zfs_send_unmodified_spill_blocks &&
411 	    (bp->blk_birth <= dsp->dsa_fromtxg)) {
412 		drrs->drr_flags |= DRR_SPILL_UNMODIFIED;
413 	}
414 
415 	/* handle raw send fields */
416 	if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
417 		ASSERT(BP_IS_PROTECTED(bp));
418 
419 		if (BP_SHOULD_BYTESWAP(bp))
420 			drrs->drr_flags |= DRR_RAW_BYTESWAP;
421 		drrs->drr_compressiontype = BP_GET_COMPRESS(bp);
422 		drrs->drr_compressed_size = BP_GET_PSIZE(bp);
423 		zio_crypt_decode_params_bp(bp, drrs->drr_salt, drrs->drr_iv);
424 		zio_crypt_decode_mac_bp(bp, drrs->drr_mac);
425 		payload_size = drrs->drr_compressed_size;
426 	}
427 
428 	if (dump_record(dsp, data, payload_size) != 0)
429 		return (SET_ERROR(EINTR));
430 	return (0);
431 }
432 
433 static int
434 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
435 {
436 	struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
437 
438 	/*
439 	 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
440 	 * push it out, since free block aggregation can only be done for
441 	 * blocks of the same type (i.e., DRR_FREE records can only be
442 	 * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
443 	 * can only be aggregated with other DRR_FREEOBJECTS records.
444 	 */
445 	if (dsp->dsa_pending_op != PENDING_NONE &&
446 	    dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
447 		if (dump_record(dsp, NULL, 0) != 0)
448 			return (SET_ERROR(EINTR));
449 		dsp->dsa_pending_op = PENDING_NONE;
450 	}
451 	if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
452 		/*
453 		 * See whether this free object array can be aggregated
454 		 * with pending one
455 		 */
456 		if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
457 			drrfo->drr_numobjs += numobjs;
458 			return (0);
459 		} else {
460 			/* can't be aggregated.  Push out pending record */
461 			if (dump_record(dsp, NULL, 0) != 0)
462 				return (SET_ERROR(EINTR));
463 			dsp->dsa_pending_op = PENDING_NONE;
464 		}
465 	}
466 
467 	/* write a FREEOBJECTS record */
468 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
469 	dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
470 	drrfo->drr_firstobj = firstobj;
471 	drrfo->drr_numobjs = numobjs;
472 	drrfo->drr_toguid = dsp->dsa_toguid;
473 
474 	dsp->dsa_pending_op = PENDING_FREEOBJECTS;
475 
476 	return (0);
477 }
478 
479 static int
480 dump_dnode(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object,
481     dnode_phys_t *dnp)
482 {
483 	struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
484 	int bonuslen;
485 
486 	if (object < dsp->dsa_resume_object) {
487 		/*
488 		 * Note: when resuming, we will visit all the dnodes in
489 		 * the block of dnodes that we are resuming from.  In
490 		 * this case it's unnecessary to send the dnodes prior to
491 		 * the one we are resuming from.  We should be at most one
492 		 * block's worth of dnodes behind the resume point.
493 		 */
494 		ASSERT3U(dsp->dsa_resume_object - object, <,
495 		    1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
496 		return (0);
497 	}
498 
499 	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
500 		return (dump_freeobjects(dsp, object, 1));
501 
502 	if (dsp->dsa_pending_op != PENDING_NONE) {
503 		if (dump_record(dsp, NULL, 0) != 0)
504 			return (SET_ERROR(EINTR));
505 		dsp->dsa_pending_op = PENDING_NONE;
506 	}
507 
508 	/* write an OBJECT record */
509 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
510 	dsp->dsa_drr->drr_type = DRR_OBJECT;
511 	drro->drr_object = object;
512 	drro->drr_type = dnp->dn_type;
513 	drro->drr_bonustype = dnp->dn_bonustype;
514 	drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
515 	drro->drr_bonuslen = dnp->dn_bonuslen;
516 	drro->drr_dn_slots = dnp->dn_extra_slots + 1;
517 	drro->drr_checksumtype = dnp->dn_checksum;
518 	drro->drr_compress = dnp->dn_compress;
519 	drro->drr_toguid = dsp->dsa_toguid;
520 
521 	if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
522 	    drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
523 		drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
524 
525 	bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8);
526 
527 	if ((dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW)) {
528 		ASSERT(BP_IS_ENCRYPTED(bp));
529 
530 		if (BP_SHOULD_BYTESWAP(bp))
531 			drro->drr_flags |= DRR_RAW_BYTESWAP;
532 
533 		/* needed for reconstructing dnp on recv side */
534 		drro->drr_maxblkid = dnp->dn_maxblkid;
535 		drro->drr_indblkshift = dnp->dn_indblkshift;
536 		drro->drr_nlevels = dnp->dn_nlevels;
537 		drro->drr_nblkptr = dnp->dn_nblkptr;
538 
539 		/*
540 		 * Since we encrypt the entire bonus area, the (raw) part
541 		 * beyond the bonuslen is actually nonzero, so we need
542 		 * to send it.
543 		 */
544 		if (bonuslen != 0) {
545 			drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);
546 			bonuslen = drro->drr_raw_bonuslen;
547 		}
548 	}
549 
550 	/*
551 	 * DRR_OBJECT_SPILL is set for every dnode which references a
552 	 * spill block.  This allows the receiving pool to definitively
553 	 * determine when a spill block should be kept or freed.
554 	 */
555 	if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR)
556 		drro->drr_flags |= DRR_OBJECT_SPILL;
557 
558 	if (dump_record(dsp, DN_BONUS(dnp), bonuslen) != 0)
559 		return (SET_ERROR(EINTR));
560 
561 	/* Free anything past the end of the file. */
562 	if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
563 	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0)
564 		return (SET_ERROR(EINTR));
565 
566 	/*
567 	 * Send DRR_SPILL records for unmodified spill blocks.  This is useful
568 	 * because changing certain attributes of the object (e.g. blocksize)
569 	 * can cause old versions of ZFS to incorrectly remove a spill block.
570 	 * Including these records in the stream forces an up to date version
571 	 * to always be written ensuring they're never lost.  Current versions
572 	 * of the code which understand the DRR_FLAG_SPILL_BLOCK feature can
573 	 * ignore these unmodified spill blocks.
574 	 */
575 	if (zfs_send_unmodified_spill_blocks &&
576 	    (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) &&
577 	    (DN_SPILL_BLKPTR(dnp)->blk_birth <= dsp->dsa_fromtxg)) {
578 		struct send_block_record record;
579 
580 		bzero(&record, sizeof (struct send_block_record));
581 		record.eos_marker = B_FALSE;
582 		record.bp = *DN_SPILL_BLKPTR(dnp);
583 		SET_BOOKMARK(&(record.zb), dmu_objset_id(dsp->dsa_os),
584 		    object, 0, DMU_SPILL_BLKID);
585 
586 		if (do_dump(dsp, &record) != 0)
587 			return (SET_ERROR(EINTR));
588 	}
589 
590 	if (dsp->dsa_err != 0)
591 		return (SET_ERROR(EINTR));
592 	return (0);
593 }
594 
595 static int
596 dump_object_range(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t firstobj,
597     uint64_t numslots)
598 {
599 	struct drr_object_range *drror =
600 	    &(dsp->dsa_drr->drr_u.drr_object_range);
601 
602 	/* we only use this record type for raw sends */
603 	ASSERT(BP_IS_PROTECTED(bp));
604 	ASSERT(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW);
605 	ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
606 	ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE);
607 	ASSERT0(BP_GET_LEVEL(bp));
608 
609 	if (dsp->dsa_pending_op != PENDING_NONE) {
610 		if (dump_record(dsp, NULL, 0) != 0)
611 			return (SET_ERROR(EINTR));
612 		dsp->dsa_pending_op = PENDING_NONE;
613 	}
614 
615 	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
616 	dsp->dsa_drr->drr_type = DRR_OBJECT_RANGE;
617 	drror->drr_firstobj = firstobj;
618 	drror->drr_numslots = numslots;
619 	drror->drr_toguid = dsp->dsa_toguid;
620 	if (BP_SHOULD_BYTESWAP(bp))
621 		drror->drr_flags |= DRR_RAW_BYTESWAP;
622 	zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv);
623 	zio_crypt_decode_mac_bp(bp, drror->drr_mac);
624 
625 	if (dump_record(dsp, NULL, 0) != 0)
626 		return (SET_ERROR(EINTR));
627 	return (0);
628 }
629 
630 static boolean_t
631 backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
632 {
633 	if (!BP_IS_EMBEDDED(bp))
634 		return (B_FALSE);
635 
636 	/*
637 	 * Compression function must be legacy, or explicitly enabled.
638 	 */
639 	if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
640 	    !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LZ4)))
641 		return (B_FALSE);
642 
643 	/*
644 	 * Embed type must be explicitly enabled.
645 	 */
646 	switch (BPE_GET_ETYPE(bp)) {
647 	case BP_EMBEDDED_TYPE_DATA:
648 		if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
649 			return (B_TRUE);
650 		break;
651 	default:
652 		return (B_FALSE);
653 	}
654 	return (B_FALSE);
655 }
656 
657 /*
658  * This is the callback function to traverse_dataset that acts as the worker
659  * thread for dmu_send_impl.
660  */
661 /*ARGSUSED*/
662 static int
663 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
664     const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
665 {
666 	struct send_thread_arg *sta = arg;
667 	struct send_block_record *record;
668 	uint64_t record_size;
669 	int err = 0;
670 
671 	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
672 	    zb->zb_object >= sta->resume.zb_object);
673 	ASSERT3P(sta->ds, !=, NULL);
674 
675 	if (sta->cancel)
676 		return (SET_ERROR(EINTR));
677 
678 	if (bp == NULL) {
679 		ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
680 		return (0);
681 	} else if (zb->zb_level < 0) {
682 		return (0);
683 	}
684 
685 	record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
686 	record->eos_marker = B_FALSE;
687 	record->bp = *bp;
688 	record->zb = *zb;
689 	record->indblkshift = dnp->dn_indblkshift;
690 	record->datablkszsec = dnp->dn_datablkszsec;
691 	record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
692 	bqueue_enqueue(&sta->q, record, record_size);
693 
694 	return (err);
695 }
696 
697 /*
698  * This function kicks off the traverse_dataset.  It also handles setting the
699  * error code of the thread in case something goes wrong, and pushes the End of
700  * Stream record when the traverse_dataset call has finished.  If there is no
701  * dataset to traverse, the thread immediately pushes End of Stream marker.
702  */
703 static void
704 send_traverse_thread(void *arg)
705 {
706 	struct send_thread_arg *st_arg = arg;
707 	int err;
708 	struct send_block_record *data;
709 
710 	if (st_arg->ds != NULL) {
711 		err = traverse_dataset_resume(st_arg->ds,
712 		    st_arg->fromtxg, &st_arg->resume,
713 		    st_arg->flags, send_cb, st_arg);
714 
715 		if (err != EINTR)
716 			st_arg->error_code = err;
717 	}
718 	data = kmem_zalloc(sizeof (*data), KM_SLEEP);
719 	data->eos_marker = B_TRUE;
720 	bqueue_enqueue(&st_arg->q, data, 1);
721 	thread_exit();
722 }
723 
724 /*
725  * This function actually handles figuring out what kind of record needs to be
726  * dumped, reading the data (which has hopefully been prefetched), and calling
727  * the appropriate helper function.
728  */
729 static int
730 do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
731 {
732 	dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
733 	const blkptr_t *bp = &data->bp;
734 	const zbookmark_phys_t *zb = &data->zb;
735 	uint8_t indblkshift = data->indblkshift;
736 	uint16_t dblkszsec = data->datablkszsec;
737 	spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
738 	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
739 	int err = 0;
740 
741 	ASSERT3U(zb->zb_level, >=, 0);
742 
743 	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
744 	    zb->zb_object >= dsa->dsa_resume_object);
745 
746 	/*
747 	 * All bps of an encrypted os should have the encryption bit set.
748 	 * If this is not true it indicates tampering and we report an error.
749 	 */
750 	if (dsa->dsa_os->os_encrypted &&
751 	    !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
752 		spa_log_error(spa, zb);
753 		zfs_panic_recover("unencrypted block in encrypted "
754 		    "object set %llu", ds->ds_object);
755 		return (SET_ERROR(EIO));
756 	}
757 
758 	if (zb->zb_object != DMU_META_DNODE_OBJECT &&
759 	    DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
760 		return (0);
761 	} else if (BP_IS_HOLE(bp) &&
762 	    zb->zb_object == DMU_META_DNODE_OBJECT) {
763 		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
764 		uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
765 		err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
766 	} else if (BP_IS_HOLE(bp)) {
767 		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
768 		uint64_t offset = zb->zb_blkid * span;
769 		/* Don't dump free records for offsets > DMU_OBJECT_END */
770 		if (zb->zb_blkid == 0 || span <= DMU_OBJECT_END / zb->zb_blkid)
771 			err = dump_free(dsa, zb->zb_object, offset, span);
772 	} else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
773 		return (0);
774 	} else if (type == DMU_OT_DNODE) {
775 		int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
776 		arc_flags_t aflags = ARC_FLAG_WAIT;
777 		arc_buf_t *abuf;
778 		enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
779 
780 		if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
781 			ASSERT(BP_IS_ENCRYPTED(bp));
782 			ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
783 			zioflags |= ZIO_FLAG_RAW;
784 		}
785 
786 		ASSERT0(zb->zb_level);
787 
788 		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
789 		    ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0)
790 			return (SET_ERROR(EIO));
791 
792 		dnode_phys_t *blk = abuf->b_data;
793 		uint64_t dnobj = zb->zb_blkid * epb;
794 
795 		/*
796 		 * Raw sends require sending encryption parameters for the
797 		 * block of dnodes. Regular sends do not need to send this
798 		 * info.
799 		 */
800 		if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
801 			ASSERT(arc_is_encrypted(abuf));
802 			err = dump_object_range(dsa, bp, dnobj, epb);
803 		}
804 
805 		if (err == 0) {
806 			for (int i = 0; i < epb;
807 			    i += blk[i].dn_extra_slots + 1) {
808 				err = dump_dnode(dsa, bp, dnobj + i, blk + i);
809 				if (err != 0)
810 					break;
811 			}
812 		}
813 		arc_buf_destroy(abuf, &abuf);
814 	} else if (type == DMU_OT_SA) {
815 		arc_flags_t aflags = ARC_FLAG_WAIT;
816 		arc_buf_t *abuf;
817 		enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
818 
819 		if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
820 			ASSERT(BP_IS_PROTECTED(bp));
821 			zioflags |= ZIO_FLAG_RAW;
822 		}
823 
824 		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
825 		    ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0)
826 			return (SET_ERROR(EIO));
827 
828 		err = dump_spill(dsa, bp, zb->zb_object, abuf->b_data);
829 		arc_buf_destroy(abuf, &abuf);
830 	} else if (backup_do_embed(dsa, bp)) {
831 		/* it's an embedded level-0 block of a regular object */
832 		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
833 		ASSERT0(zb->zb_level);
834 		err = dump_write_embedded(dsa, zb->zb_object,
835 		    zb->zb_blkid * blksz, blksz, bp);
836 	} else {
837 		/* it's a level-0 block of a regular object */
838 		arc_flags_t aflags = ARC_FLAG_WAIT;
839 		arc_buf_t *abuf;
840 		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
841 		uint64_t offset;
842 
843 		/*
844 		 * If we have large blocks stored on disk but the send flags
845 		 * don't allow us to send large blocks, we split the data from
846 		 * the arc buf into chunks.
847 		 */
848 		boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE &&
849 		    !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
850 
851 		/*
852 		 * Raw sends require that we always get raw data as it exists
853 		 * on disk, so we assert that we are not splitting blocks here.
854 		 */
855 		boolean_t request_raw =
856 		    (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
857 
858 		/*
859 		 * We should only request compressed data from the ARC if all
860 		 * the following are true:
861 		 *  - stream compression was requested
862 		 *  - we aren't splitting large blocks into smaller chunks
863 		 *  - the data won't need to be byteswapped before sending
864 		 *  - this isn't an embedded block
865 		 *  - this isn't metadata (if receiving on a different endian
866 		 *    system it can be byteswapped more easily)
867 		 */
868 		boolean_t request_compressed =
869 		    (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
870 		    !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
871 		    !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
872 
873 		IMPLY(request_raw, !split_large_blocks);
874 		IMPLY(request_raw, BP_IS_PROTECTED(bp));
875 		ASSERT0(zb->zb_level);
876 		ASSERT(zb->zb_object > dsa->dsa_resume_object ||
877 		    (zb->zb_object == dsa->dsa_resume_object &&
878 		    zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
879 
880 		ASSERT0(zb->zb_level);
881 		ASSERT(zb->zb_object > dsa->dsa_resume_object ||
882 		    (zb->zb_object == dsa->dsa_resume_object &&
883 		    zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
884 
885 		ASSERT3U(blksz, ==, BP_GET_LSIZE(bp));
886 
887 		enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
888 		if (request_raw)
889 			zioflags |= ZIO_FLAG_RAW;
890 		else if (request_compressed)
891 			zioflags |= ZIO_FLAG_RAW_COMPRESS;
892 
893 		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
894 		    ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) {
895 			if (zfs_send_corrupt_data) {
896 				/* Send a block filled with 0x"zfs badd bloc" */
897 				abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
898 				    blksz);
899 				uint64_t *ptr;
900 				for (ptr = abuf->b_data;
901 				    (char *)ptr < (char *)abuf->b_data + blksz;
902 				    ptr++)
903 					*ptr = 0x2f5baddb10cULL;
904 			} else {
905 				return (SET_ERROR(EIO));
906 			}
907 		}
908 
909 		offset = zb->zb_blkid * blksz;
910 
911 		if (split_large_blocks) {
912 			ASSERT0(arc_is_encrypted(abuf));
913 			ASSERT3U(arc_get_compression(abuf), ==,
914 			    ZIO_COMPRESS_OFF);
915 			char *buf = abuf->b_data;
916 			while (blksz > 0 && err == 0) {
917 				int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
918 				err = dump_write(dsa, type, zb->zb_object,
919 				    offset, n, n, NULL, buf);
920 				offset += n;
921 				buf += n;
922 				blksz -= n;
923 			}
924 		} else {
925 			err = dump_write(dsa, type, zb->zb_object, offset,
926 			    blksz, arc_buf_size(abuf), bp, abuf->b_data);
927 		}
928 		arc_buf_destroy(abuf, &abuf);
929 	}
930 
931 	ASSERT(err == 0 || err == EINTR);
932 	return (err);
933 }
934 
935 /*
936  * Pop the new data off the queue, and free the old data.
937  */
938 static struct send_block_record *
939 get_next_record(bqueue_t *bq, struct send_block_record *data)
940 {
941 	struct send_block_record *tmp = bqueue_dequeue(bq);
942 	kmem_free(data, sizeof (*data));
943 	return (tmp);
944 }
945 
946 /*
947  * Actually do the bulk of the work in a zfs send.
948  *
949  * Note: Releases dp using the specified tag.
950  */
951 static int
952 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
953     zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone,
954     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
955     boolean_t rawok, int outfd, uint64_t resumeobj, uint64_t resumeoff,
956     vnode_t *vp, offset_t *off)
957 {
958 	objset_t *os;
959 	dmu_replay_record_t *drr;
960 	dmu_sendarg_t *dsp;
961 	int err;
962 	uint64_t fromtxg = 0;
963 	uint64_t featureflags = 0;
964 	struct send_thread_arg to_arg = { 0 };
965 
966 	err = dmu_objset_from_ds(to_ds, &os);
967 	if (err != 0) {
968 		dsl_pool_rele(dp, tag);
969 		return (err);
970 	}
971 
972 	/*
973 	 * If this is a non-raw send of an encrypted ds, we can ensure that
974 	 * the objset_phys_t is authenticated. This is safe because this is
975 	 * either a snapshot or we have owned the dataset, ensuring that
976 	 * it can't be modified.
977 	 */
978 	if (!rawok && os->os_encrypted &&
979 	    arc_is_unauthenticated(os->os_phys_buf)) {
980 		zbookmark_phys_t zb;
981 
982 		SET_BOOKMARK(&zb, to_ds->ds_object, ZB_ROOT_OBJECT,
983 		    ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
984 		err = arc_untransform(os->os_phys_buf, os->os_spa,
985 		    &zb, B_FALSE);
986 		if (err != 0) {
987 			dsl_pool_rele(dp, tag);
988 			return (err);
989 		}
990 
991 		ASSERT0(arc_is_unauthenticated(os->os_phys_buf));
992 	}
993 
994 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
995 	drr->drr_type = DRR_BEGIN;
996 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
997 	DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
998 	    DMU_SUBSTREAM);
999 
1000 #ifdef _KERNEL
1001 	if (dmu_objset_type(os) == DMU_OST_ZFS) {
1002 		uint64_t version;
1003 		if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
1004 			kmem_free(drr, sizeof (dmu_replay_record_t));
1005 			dsl_pool_rele(dp, tag);
1006 			return (SET_ERROR(EINVAL));
1007 		}
1008 		if (version >= ZPL_VERSION_SA) {
1009 			featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
1010 		}
1011 	}
1012 #endif
1013 
1014 	/* raw sends imply large_block_ok */
1015 	if ((large_block_ok || rawok) &&
1016 	    to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
1017 		featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
1018 	if (to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_DNODE])
1019 		featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
1020 
1021 	/* encrypted datasets will not have embedded blocks */
1022 	if ((embedok || rawok) && !os->os_encrypted &&
1023 	    spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
1024 		featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
1025 	}
1026 
1027 	/* raw send implies compressok */
1028 	if (compressok || rawok)
1029 		featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
1030 	if (rawok && os->os_encrypted)
1031 		featureflags |= DMU_BACKUP_FEATURE_RAW;
1032 
1033 	if ((featureflags &
1034 	    (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |
1035 	    DMU_BACKUP_FEATURE_RAW)) != 0 &&
1036 	    spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
1037 		featureflags |= DMU_BACKUP_FEATURE_LZ4;
1038 	}
1039 
1040 	if (resumeobj != 0 || resumeoff != 0) {
1041 		featureflags |= DMU_BACKUP_FEATURE_RESUMING;
1042 	}
1043 
1044 	DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
1045 	    featureflags);
1046 
1047 	drr->drr_u.drr_begin.drr_creation_time =
1048 	    dsl_dataset_phys(to_ds)->ds_creation_time;
1049 	drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
1050 	if (is_clone)
1051 		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
1052 	drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
1053 	if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
1054 		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
1055 	if (zfs_send_set_freerecords_bit)
1056 		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
1057 
1058 	drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
1059 
1060 	if (ancestor_zb != NULL) {
1061 		drr->drr_u.drr_begin.drr_fromguid =
1062 		    ancestor_zb->zbm_guid;
1063 		fromtxg = ancestor_zb->zbm_creation_txg;
1064 	}
1065 	dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
1066 	if (!to_ds->ds_is_snapshot) {
1067 		(void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
1068 		    sizeof (drr->drr_u.drr_begin.drr_toname));
1069 	}
1070 
1071 	dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
1072 
1073 	dsp->dsa_drr = drr;
1074 	dsp->dsa_vp = vp;
1075 	dsp->dsa_outfd = outfd;
1076 	dsp->dsa_proc = curproc;
1077 	dsp->dsa_os = os;
1078 	dsp->dsa_off = off;
1079 	dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
1080 	dsp->dsa_fromtxg = fromtxg;
1081 	dsp->dsa_pending_op = PENDING_NONE;
1082 	dsp->dsa_featureflags = featureflags;
1083 	dsp->dsa_resume_object = resumeobj;
1084 	dsp->dsa_resume_offset = resumeoff;
1085 
1086 	mutex_enter(&to_ds->ds_sendstream_lock);
1087 	list_insert_head(&to_ds->ds_sendstreams, dsp);
1088 	mutex_exit(&to_ds->ds_sendstream_lock);
1089 
1090 	dsl_dataset_long_hold(to_ds, FTAG);
1091 	dsl_pool_rele(dp, tag);
1092 
1093 	void *payload = NULL;
1094 	size_t payload_len = 0;
1095 	/* handle features that require a DRR_BEGIN payload */
1096 	if (featureflags &
1097 	    (DMU_BACKUP_FEATURE_RESUMING | DMU_BACKUP_FEATURE_RAW)) {
1098 		nvlist_t *keynvl = NULL;
1099 		nvlist_t *nvl = fnvlist_alloc();
1100 
1101 		if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
1102 			dmu_object_info_t to_doi;
1103 			err = dmu_object_info(os, resumeobj, &to_doi);
1104 			if (err != 0) {
1105 				fnvlist_free(nvl);
1106 				goto out;
1107 			}
1108 
1109 			SET_BOOKMARK(&to_arg.resume, to_ds->ds_object,
1110 			    resumeobj, 0,
1111 			    resumeoff / to_doi.doi_data_block_size);
1112 
1113 			fnvlist_add_uint64(nvl, "resume_object", resumeobj);
1114 			fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
1115 		}
1116 
1117 		if (featureflags & DMU_BACKUP_FEATURE_RAW) {
1118 			uint64_t ivset_guid = (ancestor_zb != NULL) ?
1119 			    ancestor_zb->zbm_ivset_guid : 0;
1120 
1121 			ASSERT(os->os_encrypted);
1122 
1123 			err = dsl_crypto_populate_key_nvlist(to_ds,
1124 			    ivset_guid, &keynvl);
1125 			if (err != 0) {
1126 				fnvlist_free(nvl);
1127 				goto out;
1128 			}
1129 
1130 			fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);
1131 		}
1132 
1133 		payload = fnvlist_pack(nvl, &payload_len);
1134 		drr->drr_payloadlen = payload_len;
1135 		fnvlist_free(keynvl);
1136 		fnvlist_free(nvl);
1137 	}
1138 
1139 	err = dump_record(dsp, payload, payload_len);
1140 	fnvlist_pack_free(payload, payload_len);
1141 	if (err != 0) {
1142 		err = dsp->dsa_err;
1143 		goto out;
1144 	}
1145 
1146 	err = bqueue_init(&to_arg.q,
1147 	    MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
1148 	    offsetof(struct send_block_record, ln));
1149 	to_arg.error_code = 0;
1150 	to_arg.cancel = B_FALSE;
1151 	to_arg.ds = to_ds;
1152 	to_arg.fromtxg = fromtxg;
1153 	to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
1154 	if (rawok)
1155 		to_arg.flags |= TRAVERSE_NO_DECRYPT;
1156 	(void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
1157 	    TS_RUN, minclsyspri);
1158 
1159 	struct send_block_record *to_data;
1160 	to_data = bqueue_dequeue(&to_arg.q);
1161 
1162 	while (!to_data->eos_marker && err == 0) {
1163 		err = do_dump(dsp, to_data);
1164 		to_data = get_next_record(&to_arg.q, to_data);
1165 		if (issig(JUSTLOOKING) && issig(FORREAL))
1166 			err = EINTR;
1167 	}
1168 
1169 	if (err != 0) {
1170 		to_arg.cancel = B_TRUE;
1171 		while (!to_data->eos_marker) {
1172 			to_data = get_next_record(&to_arg.q, to_data);
1173 		}
1174 	}
1175 	kmem_free(to_data, sizeof (*to_data));
1176 
1177 	bqueue_destroy(&to_arg.q);
1178 
1179 	if (err == 0 && to_arg.error_code != 0)
1180 		err = to_arg.error_code;
1181 
1182 	if (err != 0)
1183 		goto out;
1184 
1185 	if (dsp->dsa_pending_op != PENDING_NONE)
1186 		if (dump_record(dsp, NULL, 0) != 0)
1187 			err = SET_ERROR(EINTR);
1188 
1189 	if (err != 0) {
1190 		if (err == EINTR && dsp->dsa_err != 0)
1191 			err = dsp->dsa_err;
1192 		goto out;
1193 	}
1194 
1195 	bzero(drr, sizeof (dmu_replay_record_t));
1196 	drr->drr_type = DRR_END;
1197 	drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
1198 	drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
1199 
1200 	if (dump_record(dsp, NULL, 0) != 0)
1201 		err = dsp->dsa_err;
1202 out:
1203 	mutex_enter(&to_ds->ds_sendstream_lock);
1204 	list_remove(&to_ds->ds_sendstreams, dsp);
1205 	mutex_exit(&to_ds->ds_sendstream_lock);
1206 
1207 	VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end));
1208 
1209 	kmem_free(drr, sizeof (dmu_replay_record_t));
1210 	kmem_free(dsp, sizeof (dmu_sendarg_t));
1211 
1212 	dsl_dataset_long_rele(to_ds, FTAG);
1213 
1214 	return (err);
1215 }
1216 
1217 int
1218 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
1219     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
1220     boolean_t rawok, int outfd, vnode_t *vp, offset_t *off)
1221 {
1222 	dsl_pool_t *dp;
1223 	dsl_dataset_t *ds;
1224 	dsl_dataset_t *fromds = NULL;
1225 	ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
1226 	int err;
1227 
1228 	err = dsl_pool_hold(pool, FTAG, &dp);
1229 	if (err != 0)
1230 		return (err);
1231 
1232 	err = dsl_dataset_hold_obj_flags(dp, tosnap, dsflags, FTAG, &ds);
1233 	if (err != 0) {
1234 		dsl_pool_rele(dp, FTAG);
1235 		return (err);
1236 	}
1237 
1238 	if (fromsnap != 0) {
1239 		zfs_bookmark_phys_t zb = { 0 };
1240 		boolean_t is_clone;
1241 
1242 		err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
1243 		if (err != 0) {
1244 			dsl_dataset_rele_flags(ds, dsflags, FTAG);
1245 			dsl_pool_rele(dp, FTAG);
1246 			return (err);
1247 		}
1248 		if (!dsl_dataset_is_before(ds, fromds, 0)) {
1249 			err = SET_ERROR(EXDEV);
1250 			dsl_dataset_rele(fromds, FTAG);
1251 			dsl_dataset_rele_flags(ds, dsflags, FTAG);
1252 			dsl_pool_rele(dp, FTAG);
1253 			return (err);
1254 		}
1255 
1256 		zb.zbm_creation_time =
1257 		    dsl_dataset_phys(fromds)->ds_creation_time;
1258 		zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
1259 		zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1260 
1261 		if (dsl_dataset_is_zapified(fromds)) {
1262 			(void) zap_lookup(dp->dp_meta_objset,
1263 			    fromds->ds_object, DS_FIELD_IVSET_GUID, 8, 1,
1264 			    &zb.zbm_ivset_guid);
1265 		}
1266 
1267 		is_clone = (fromds->ds_dir != ds->ds_dir);
1268 		dsl_dataset_rele(fromds, FTAG);
1269 		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1270 		    embedok, large_block_ok, compressok, rawok, outfd,
1271 		    0, 0, vp, off);
1272 	} else {
1273 		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1274 		    embedok, large_block_ok, compressok, rawok, outfd,
1275 		    0, 0, vp, off);
1276 	}
1277 	dsl_dataset_rele_flags(ds, dsflags, FTAG);
1278 	return (err);
1279 }
1280 
1281 int
1282 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
1283     boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
1284     int outfd, uint64_t resumeobj, uint64_t resumeoff, vnode_t *vp,
1285     offset_t *off)
1286 {
1287 	dsl_pool_t *dp;
1288 	dsl_dataset_t *ds;
1289 	int err;
1290 	ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
1291 	boolean_t owned = B_FALSE;
1292 
1293 	if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
1294 		return (SET_ERROR(EINVAL));
1295 
1296 	err = dsl_pool_hold(tosnap, FTAG, &dp);
1297 	if (err != 0)
1298 		return (err);
1299 	if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
1300 		/*
1301 		 * We are sending a filesystem or volume.  Ensure
1302 		 * that it doesn't change by owning the dataset.
1303 		 */
1304 		err = dsl_dataset_own(dp, tosnap, dsflags, FTAG, &ds);
1305 		owned = B_TRUE;
1306 	} else {
1307 		err = dsl_dataset_hold_flags(dp, tosnap, dsflags, FTAG, &ds);
1308 	}
1309 	if (err != 0) {
1310 		dsl_pool_rele(dp, FTAG);
1311 		return (err);
1312 	}
1313 
1314 	if (fromsnap != NULL) {
1315 		zfs_bookmark_phys_t zb = { 0 };
1316 		boolean_t is_clone = B_FALSE;
1317 		int fsnamelen = strchr(tosnap, '@') - tosnap;
1318 
1319 		/*
1320 		 * If the fromsnap is in a different filesystem, then
1321 		 * mark the send stream as a clone.
1322 		 */
1323 		if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
1324 		    (fromsnap[fsnamelen] != '@' &&
1325 		    fromsnap[fsnamelen] != '#')) {
1326 			is_clone = B_TRUE;
1327 		}
1328 
1329 		if (strchr(fromsnap, '@')) {
1330 			dsl_dataset_t *fromds;
1331 			err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
1332 			if (err == 0) {
1333 				if (!dsl_dataset_is_before(ds, fromds, 0))
1334 					err = SET_ERROR(EXDEV);
1335 				zb.zbm_creation_time =
1336 				    dsl_dataset_phys(fromds)->ds_creation_time;
1337 				zb.zbm_creation_txg =
1338 				    dsl_dataset_phys(fromds)->ds_creation_txg;
1339 				zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1340 				is_clone = (ds->ds_dir != fromds->ds_dir);
1341 
1342 				if (dsl_dataset_is_zapified(fromds)) {
1343 					(void) zap_lookup(dp->dp_meta_objset,
1344 					    fromds->ds_object,
1345 					    DS_FIELD_IVSET_GUID, 8, 1,
1346 					    &zb.zbm_ivset_guid);
1347 				}
1348 				dsl_dataset_rele(fromds, FTAG);
1349 			}
1350 		} else {
1351 			err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1352 		}
1353 		if (err != 0) {
1354 			if (owned)
1355 				dsl_dataset_disown(ds, dsflags, FTAG);
1356 			else
1357 				dsl_dataset_rele_flags(ds, dsflags, FTAG);
1358 
1359 			dsl_pool_rele(dp, FTAG);
1360 			return (err);
1361 		}
1362 		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1363 		    embedok, large_block_ok, compressok, rawok,
1364 		    outfd, resumeobj, resumeoff, vp, off);
1365 	} else {
1366 		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1367 		    embedok, large_block_ok, compressok, rawok,
1368 		    outfd, resumeobj, resumeoff, vp, off);
1369 	}
1370 	if (owned)
1371 		dsl_dataset_disown(ds, dsflags, FTAG);
1372 	else
1373 		dsl_dataset_rele_flags(ds, dsflags, FTAG);
1374 
1375 	return (err);
1376 }
1377 
1378 static int
1379 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
1380     uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
1381 {
1382 	int err = 0;
1383 	uint64_t size;
1384 	/*
1385 	 * Assume that space (both on-disk and in-stream) is dominated by
1386 	 * data.  We will adjust for indirect blocks and the copies property,
1387 	 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1388 	 */
1389 	uint64_t recordsize;
1390 	uint64_t record_count;
1391 	objset_t *os;
1392 	VERIFY0(dmu_objset_from_ds(ds, &os));
1393 
1394 	/* Assume all (uncompressed) blocks are recordsize. */
1395 	if (zfs_override_estimate_recordsize != 0) {
1396 		recordsize = zfs_override_estimate_recordsize;
1397 	} else if (os->os_phys->os_type == DMU_OST_ZVOL) {
1398 		err = dsl_prop_get_int_ds(ds,
1399 		    zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);
1400 	} else {
1401 		err = dsl_prop_get_int_ds(ds,
1402 		    zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);
1403 	}
1404 	if (err != 0)
1405 		return (err);
1406 	record_count = uncompressed / recordsize;
1407 
1408 	/*
1409 	 * If we're estimating a send size for a compressed stream, use the
1410 	 * compressed data size to estimate the stream size. Otherwise, use the
1411 	 * uncompressed data size.
1412 	 */
1413 	size = stream_compressed ? compressed : uncompressed;
1414 
1415 	/*
1416 	 * Subtract out approximate space used by indirect blocks.
1417 	 * Assume most space is used by data blocks (non-indirect, non-dnode).
1418 	 * Assume no ditto blocks or internal fragmentation.
1419 	 *
1420 	 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1421 	 * block.
1422 	 */
1423 	size -= record_count * sizeof (blkptr_t);
1424 
1425 	/* Add in the space for the record associated with each block. */
1426 	size += record_count * sizeof (dmu_replay_record_t);
1427 
1428 	*sizep = size;
1429 
1430 	return (0);
1431 }
1432 
1433 int
1434 dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds,
1435     boolean_t stream_compressed, uint64_t *sizep)
1436 {
1437 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1438 	int err;
1439 	uint64_t uncomp, comp;
1440 
1441 	ASSERT(dsl_pool_config_held(dp));
1442 
1443 	/* tosnap must be a snapshot */
1444 	if (!ds->ds_is_snapshot)
1445 		return (SET_ERROR(EINVAL));
1446 
1447 	/* fromsnap, if provided, must be a snapshot */
1448 	if (fromds != NULL && !fromds->ds_is_snapshot)
1449 		return (SET_ERROR(EINVAL));
1450 
1451 	/*
1452 	 * fromsnap must be an earlier snapshot from the same fs as tosnap,
1453 	 * or the origin's fs.
1454 	 */
1455 	if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1456 		return (SET_ERROR(EXDEV));
1457 
1458 	/* Get compressed and uncompressed size estimates of changed data. */
1459 	if (fromds == NULL) {
1460 		uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1461 		comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
1462 	} else {
1463 		uint64_t used;
1464 		err = dsl_dataset_space_written(fromds, ds,
1465 		    &used, &comp, &uncomp);
1466 		if (err != 0)
1467 			return (err);
1468 	}
1469 
1470 	err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
1471 	    stream_compressed, sizep);
1472 	/*
1473 	 * Add the size of the BEGIN and END records to the estimate.
1474 	 */
1475 	*sizep += 2 * sizeof (dmu_replay_record_t);
1476 	return (err);
1477 }
1478 
1479 struct calculate_send_arg {
1480 	uint64_t uncompressed;
1481 	uint64_t compressed;
1482 };
1483 
1484 /*
1485  * Simple callback used to traverse the blocks of a snapshot and sum their
1486  * uncompressed and compressed sizes.
1487  */
1488 /* ARGSUSED */
1489 static int
1490 dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1491     const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1492 {
1493 	struct calculate_send_arg *space = arg;
1494 	if (bp != NULL && !BP_IS_HOLE(bp)) {
1495 		space->uncompressed += BP_GET_UCSIZE(bp);
1496 		space->compressed += BP_GET_PSIZE(bp);
1497 	}
1498 	return (0);
1499 }
1500 
1501 /*
1502  * Given a desination snapshot and a TXG, calculate the approximate size of a
1503  * send stream sent from that TXG. from_txg may be zero, indicating that the
1504  * whole snapshot will be sent.
1505  */
1506 int
1507 dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1508     boolean_t stream_compressed, uint64_t *sizep)
1509 {
1510 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1511 	int err;
1512 	struct calculate_send_arg size = { 0 };
1513 
1514 	ASSERT(dsl_pool_config_held(dp));
1515 
1516 	/* tosnap must be a snapshot */
1517 	if (!ds->ds_is_snapshot)
1518 		return (SET_ERROR(EINVAL));
1519 
1520 	/* verify that from_txg is before the provided snapshot was taken */
1521 	if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1522 		return (SET_ERROR(EXDEV));
1523 	}
1524 
1525 	/*
1526 	 * traverse the blocks of the snapshot with birth times after
1527 	 * from_txg, summing their uncompressed size
1528 	 */
1529 	err = traverse_dataset(ds, from_txg,
1530 	    TRAVERSE_POST | TRAVERSE_NO_DECRYPT,
1531 	    dmu_calculate_send_traversal, &size);
1532 	if (err)
1533 		return (err);
1534 
1535 	err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed,
1536 	    size.compressed, stream_compressed, sizep);
1537 	return (err);
1538 }
1539