1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26 * Copyright 2014 HybridCluster. All rights reserved.
27 * Copyright 2016 RackTop Systems.
28 * Copyright (c) 2014 Integros [integros.com]
29 */
30
31#include <sys/dmu.h>
32#include <sys/dmu_impl.h>
33#include <sys/dmu_tx.h>
34#include <sys/dbuf.h>
35#include <sys/dnode.h>
36#include <sys/zfs_context.h>
37#include <sys/dmu_objset.h>
38#include <sys/dmu_traverse.h>
39#include <sys/dsl_dataset.h>
40#include <sys/dsl_dir.h>
41#include <sys/dsl_prop.h>
42#include <sys/dsl_pool.h>
43#include <sys/dsl_synctask.h>
44#include <sys/zfs_ioctl.h>
45#include <sys/zap.h>
46#include <sys/zio_checksum.h>
47#include <sys/zfs_znode.h>
48#include <zfs_fletcher.h>
49#include <sys/avl.h>
50#include <sys/ddt.h>
51#include <sys/zfs_onexit.h>
52#include <sys/dmu_send.h>
53#include <sys/dsl_destroy.h>
54#include <sys/blkptr.h>
55#include <sys/dsl_bookmark.h>
56#include <sys/zfeature.h>
57#include <sys/bqueue.h>
58
59/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
60int zfs_send_corrupt_data = B_FALSE;
61int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
62/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
63int zfs_send_set_freerecords_bit = B_TRUE;
64/* Set this tunable to FALSE is disable sending unmodified spill blocks. */
65int zfs_send_unmodified_spill_blocks = B_TRUE;
66
67/*
68 * Use this to override the recordsize calculation for fast zfs send estimates.
69 */
70uint64_t zfs_override_estimate_recordsize = 0;
71
72#define	BP_SPAN(datablkszsec, indblkshift, level) \
73	(((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
74	(level) * (indblkshift - SPA_BLKPTRSHIFT)))
75
76struct send_thread_arg {
77	bqueue_t	q;
78	dsl_dataset_t	*ds;		/* Dataset to traverse */
79	uint64_t	fromtxg;	/* Traverse from this txg */
80	int		flags;		/* flags to pass to traverse_dataset */
81	int		error_code;
82	boolean_t	cancel;
83	zbookmark_phys_t resume;
84};
85
86struct send_block_record {
87	boolean_t		eos_marker; /* Marks the end of the stream */
88	blkptr_t		bp;
89	zbookmark_phys_t	zb;
90	uint8_t			indblkshift;
91	uint16_t		datablkszsec;
92	bqueue_node_t		ln;
93};
94
95static int do_dump(dmu_sendarg_t *dsa, struct send_block_record *data);
96
97static int
98dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
99{
100	dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
101	ssize_t resid; /* have to get resid to get detailed errno */
102
103	/*
104	 * The code does not rely on len being a multiple of 8.  We keep
105	 * this assertion because of the corresponding assertion in
106	 * receive_read().  Keeping this assertion ensures that we do not
107	 * inadvertently break backwards compatibility (causing the assertion
108	 * in receive_read() to trigger on old software). Newer feature flags
109	 * (such as raw send) may break this assertion since they were
110	 * introduced after the requirement was made obsolete.
111	 */
112
113	ASSERT(len % 8 == 0 ||
114	    (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
115
116	dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
117	    (caddr_t)buf, len,
118	    0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
119
120	mutex_enter(&ds->ds_sendstream_lock);
121	*dsp->dsa_off += len;
122	mutex_exit(&ds->ds_sendstream_lock);
123
124	return (dsp->dsa_err);
125}
126
127/*
128 * For all record types except BEGIN, fill in the checksum (overlaid in
129 * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
130 * up to the start of the checksum itself.
131 */
132static int
133dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
134{
135	ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
136	    ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
137	(void) fletcher_4_incremental_native(dsp->dsa_drr,
138	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
139	    &dsp->dsa_zc);
140	if (dsp->dsa_drr->drr_type == DRR_BEGIN) {
141		dsp->dsa_sent_begin = B_TRUE;
142	} else {
143		ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
144		    drr_checksum.drr_checksum));
145		dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
146	}
147	if (dsp->dsa_drr->drr_type == DRR_END) {
148		dsp->dsa_sent_end = B_TRUE;
149	}
150	(void) fletcher_4_incremental_native(&dsp->dsa_drr->
151	    drr_u.drr_checksum.drr_checksum,
152	    sizeof (zio_cksum_t), &dsp->dsa_zc);
153	if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
154		return (SET_ERROR(EINTR));
155	if (payload_len != 0) {
156		(void) fletcher_4_incremental_native(payload, payload_len,
157		    &dsp->dsa_zc);
158		if (dump_bytes(dsp, payload, payload_len) != 0)
159			return (SET_ERROR(EINTR));
160	}
161	return (0);
162}
163
164/*
165 * Fill in the drr_free struct, or perform aggregation if the previous record is
166 * also a free record, and the two are adjacent.
167 *
168 * Note that we send free records even for a full send, because we want to be
169 * able to receive a full send as a clone, which requires a list of all the free
170 * and freeobject records that were generated on the source.
171 */
172static int
173dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
174    uint64_t length)
175{
176	struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
177
178	/*
179	 * When we receive a free record, dbuf_free_range() assumes
180	 * that the receiving system doesn't have any dbufs in the range
181	 * being freed.  This is always true because there is a one-record
182	 * constraint: we only send one WRITE record for any given
183	 * object,offset.  We know that the one-record constraint is
184	 * true because we always send data in increasing order by
185	 * object,offset.
186	 *
187	 * If the increasing-order constraint ever changes, we should find
188	 * another way to assert that the one-record constraint is still
189	 * satisfied.
190	 */
191	ASSERT(object > dsp->dsa_last_data_object ||
192	    (object == dsp->dsa_last_data_object &&
193	    offset > dsp->dsa_last_data_offset));
194
195	/*
196	 * If there is a pending op, but it's not PENDING_FREE, push it out,
197	 * since free block aggregation can only be done for blocks of the
198	 * same type (i.e., DRR_FREE records can only be aggregated with
199	 * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
200	 * aggregated with other DRR_FREEOBJECTS records.
201	 */
202	if (dsp->dsa_pending_op != PENDING_NONE &&
203	    dsp->dsa_pending_op != PENDING_FREE) {
204		if (dump_record(dsp, NULL, 0) != 0)
205			return (SET_ERROR(EINTR));
206		dsp->dsa_pending_op = PENDING_NONE;
207	}
208
209	if (dsp->dsa_pending_op == PENDING_FREE) {
210		/*
211		 * There should never be a PENDING_FREE if length is
212		 * DMU_OBJECT_END (because dump_dnode is the only place where
213		 * this function is called with a DMU_OBJECT_END, and only after
214		 * flushing any pending record).
215		 */
216		ASSERT(length != DMU_OBJECT_END);
217		/*
218		 * Check to see whether this free block can be aggregated
219		 * with pending one.
220		 */
221		if (drrf->drr_object == object && drrf->drr_offset +
222		    drrf->drr_length == offset) {
223			if (offset + length < offset)
224				drrf->drr_length = DMU_OBJECT_END;
225			else
226				drrf->drr_length += length;
227			return (0);
228		} else {
229			/* not a continuation.  Push out pending record */
230			if (dump_record(dsp, NULL, 0) != 0)
231				return (SET_ERROR(EINTR));
232			dsp->dsa_pending_op = PENDING_NONE;
233		}
234	}
235	/* create a FREE record and make it pending */
236	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
237	dsp->dsa_drr->drr_type = DRR_FREE;
238	drrf->drr_object = object;
239	drrf->drr_offset = offset;
240	if (offset + length < offset)
241		drrf->drr_length = DMU_OBJECT_END;
242	else
243		drrf->drr_length = length;
244	drrf->drr_toguid = dsp->dsa_toguid;
245	if (length == DMU_OBJECT_END) {
246		if (dump_record(dsp, NULL, 0) != 0)
247			return (SET_ERROR(EINTR));
248	} else {
249		dsp->dsa_pending_op = PENDING_FREE;
250	}
251
252	return (0);
253}
254
255static int
256dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, uint64_t object,
257    uint64_t offset, int lsize, int psize, const blkptr_t *bp, void *data)
258{
259	uint64_t payload_size;
260	boolean_t raw = (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW);
261	struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
262
263	/*
264	 * We send data in increasing object, offset order.
265	 * See comment in dump_free() for details.
266	 */
267	ASSERT(object > dsp->dsa_last_data_object ||
268	    (object == dsp->dsa_last_data_object &&
269	    offset > dsp->dsa_last_data_offset));
270	dsp->dsa_last_data_object = object;
271	dsp->dsa_last_data_offset = offset + lsize - 1;
272
273	/*
274	 * If there is any kind of pending aggregation (currently either
275	 * a grouping of free objects or free blocks), push it out to
276	 * the stream, since aggregation can't be done across operations
277	 * of different types.
278	 */
279	if (dsp->dsa_pending_op != PENDING_NONE) {
280		if (dump_record(dsp, NULL, 0) != 0)
281			return (SET_ERROR(EINTR));
282		dsp->dsa_pending_op = PENDING_NONE;
283	}
284	/* write a WRITE record */
285	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
286	dsp->dsa_drr->drr_type = DRR_WRITE;
287	drrw->drr_object = object;
288	drrw->drr_type = type;
289	drrw->drr_offset = offset;
290	drrw->drr_toguid = dsp->dsa_toguid;
291	drrw->drr_logical_size = lsize;
292
293	/* only set the compression fields if the buf is compressed or raw */
294	if (raw || lsize != psize) {
295		ASSERT(!BP_IS_EMBEDDED(bp));
296		ASSERT3S(psize, >, 0);
297
298		if (raw) {
299			ASSERT(BP_IS_PROTECTED(bp));
300
301			/*
302			 * This is a raw protected block so we need to pass
303			 * along everything the receiving side will need to
304			 * interpret this block, including the byteswap, salt,
305			 * IV, and MAC.
306			 */
307			if (BP_SHOULD_BYTESWAP(bp))
308				drrw->drr_flags |= DRR_RAW_BYTESWAP;
309			zio_crypt_decode_params_bp(bp, drrw->drr_salt,
310			    drrw->drr_iv);
311			zio_crypt_decode_mac_bp(bp, drrw->drr_mac);
312		} else {
313			/* this is a compressed block */
314			ASSERT(dsp->dsa_featureflags &
315			    DMU_BACKUP_FEATURE_COMPRESSED);
316			ASSERT(!BP_SHOULD_BYTESWAP(bp));
317			ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
318			ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);
319			ASSERT3S(lsize, >=, psize);
320		}
321
322		/* set fields common to compressed and raw sends */
323		drrw->drr_compressiontype = BP_GET_COMPRESS(bp);
324		drrw->drr_compressed_size = psize;
325		payload_size = drrw->drr_compressed_size;
326	} else {
327		payload_size = drrw->drr_logical_size;
328	}
329
330	if (bp == NULL || BP_IS_EMBEDDED(bp) || (BP_IS_PROTECTED(bp) && !raw)) {
331		/*
332		 * There's no pre-computed checksum for partial-block writes,
333		 * embedded BP's, or encrypted BP's that are being sent as
334		 * plaintext, so (like fletcher4-checkummed blocks) userland
335		 * will have to compute a dedup-capable checksum itself.
336		 */
337		drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
338	} else {
339		drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
340		if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
341		    ZCHECKSUM_FLAG_DEDUP)
342			drrw->drr_flags |= DRR_CHECKSUM_DEDUP;
343		DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
344		DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
345		DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
346		DDK_SET_CRYPT(&drrw->drr_key, BP_IS_PROTECTED(bp));
347		drrw->drr_key.ddk_cksum = bp->blk_cksum;
348	}
349
350	if (dump_record(dsp, data, payload_size) != 0)
351		return (SET_ERROR(EINTR));
352	return (0);
353}
354
355static int
356dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
357    int blksz, const blkptr_t *bp)
358{
359	char buf[BPE_PAYLOAD_SIZE];
360	struct drr_write_embedded *drrw =
361	    &(dsp->dsa_drr->drr_u.drr_write_embedded);
362
363	if (dsp->dsa_pending_op != PENDING_NONE) {
364		if (dump_record(dsp, NULL, 0) != 0)
365			return (EINTR);
366		dsp->dsa_pending_op = PENDING_NONE;
367	}
368
369	ASSERT(BP_IS_EMBEDDED(bp));
370
371	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
372	dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
373	drrw->drr_object = object;
374	drrw->drr_offset = offset;
375	drrw->drr_length = blksz;
376	drrw->drr_toguid = dsp->dsa_toguid;
377	drrw->drr_compression = BP_GET_COMPRESS(bp);
378	drrw->drr_etype = BPE_GET_ETYPE(bp);
379	drrw->drr_lsize = BPE_GET_LSIZE(bp);
380	drrw->drr_psize = BPE_GET_PSIZE(bp);
381
382	decode_embedded_bp_compressed(bp, buf);
383
384	if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
385		return (EINTR);
386	return (0);
387}
388
389static int
390dump_spill(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object, void *data)
391{
392	struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
393	uint64_t blksz = BP_GET_LSIZE(bp);
394	uint64_t payload_size = blksz;
395
396	if (dsp->dsa_pending_op != PENDING_NONE) {
397		if (dump_record(dsp, NULL, 0) != 0)
398			return (SET_ERROR(EINTR));
399		dsp->dsa_pending_op = PENDING_NONE;
400	}
401
402	/* write a SPILL record */
403	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
404	dsp->dsa_drr->drr_type = DRR_SPILL;
405	drrs->drr_object = object;
406	drrs->drr_length = blksz;
407	drrs->drr_toguid = dsp->dsa_toguid;
408
409	/* See comment in dump_dnode() for full details */
410	if (zfs_send_unmodified_spill_blocks &&
411	    (bp->blk_birth <= dsp->dsa_fromtxg)) {
412		drrs->drr_flags |= DRR_SPILL_UNMODIFIED;
413	}
414
415	/* handle raw send fields */
416	if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
417		ASSERT(BP_IS_PROTECTED(bp));
418
419		if (BP_SHOULD_BYTESWAP(bp))
420			drrs->drr_flags |= DRR_RAW_BYTESWAP;
421		drrs->drr_compressiontype = BP_GET_COMPRESS(bp);
422		drrs->drr_compressed_size = BP_GET_PSIZE(bp);
423		zio_crypt_decode_params_bp(bp, drrs->drr_salt, drrs->drr_iv);
424		zio_crypt_decode_mac_bp(bp, drrs->drr_mac);
425		payload_size = drrs->drr_compressed_size;
426	}
427
428	if (dump_record(dsp, data, payload_size) != 0)
429		return (SET_ERROR(EINTR));
430	return (0);
431}
432
433static int
434dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
435{
436	struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
437
438	/*
439	 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
440	 * push it out, since free block aggregation can only be done for
441	 * blocks of the same type (i.e., DRR_FREE records can only be
442	 * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
443	 * can only be aggregated with other DRR_FREEOBJECTS records.
444	 */
445	if (dsp->dsa_pending_op != PENDING_NONE &&
446	    dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
447		if (dump_record(dsp, NULL, 0) != 0)
448			return (SET_ERROR(EINTR));
449		dsp->dsa_pending_op = PENDING_NONE;
450	}
451	if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
452		/*
453		 * See whether this free object array can be aggregated
454		 * with pending one
455		 */
456		if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
457			drrfo->drr_numobjs += numobjs;
458			return (0);
459		} else {
460			/* can't be aggregated.  Push out pending record */
461			if (dump_record(dsp, NULL, 0) != 0)
462				return (SET_ERROR(EINTR));
463			dsp->dsa_pending_op = PENDING_NONE;
464		}
465	}
466
467	/* write a FREEOBJECTS record */
468	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
469	dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
470	drrfo->drr_firstobj = firstobj;
471	drrfo->drr_numobjs = numobjs;
472	drrfo->drr_toguid = dsp->dsa_toguid;
473
474	dsp->dsa_pending_op = PENDING_FREEOBJECTS;
475
476	return (0);
477}
478
479static int
480dump_dnode(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object,
481    dnode_phys_t *dnp)
482{
483	struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
484	int bonuslen;
485
486	if (object < dsp->dsa_resume_object) {
487		/*
488		 * Note: when resuming, we will visit all the dnodes in
489		 * the block of dnodes that we are resuming from.  In
490		 * this case it's unnecessary to send the dnodes prior to
491		 * the one we are resuming from.  We should be at most one
492		 * block's worth of dnodes behind the resume point.
493		 */
494		ASSERT3U(dsp->dsa_resume_object - object, <,
495		    1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
496		return (0);
497	}
498
499	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
500		return (dump_freeobjects(dsp, object, 1));
501
502	if (dsp->dsa_pending_op != PENDING_NONE) {
503		if (dump_record(dsp, NULL, 0) != 0)
504			return (SET_ERROR(EINTR));
505		dsp->dsa_pending_op = PENDING_NONE;
506	}
507
508	/* write an OBJECT record */
509	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
510	dsp->dsa_drr->drr_type = DRR_OBJECT;
511	drro->drr_object = object;
512	drro->drr_type = dnp->dn_type;
513	drro->drr_bonustype = dnp->dn_bonustype;
514	drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
515	drro->drr_bonuslen = dnp->dn_bonuslen;
516	drro->drr_dn_slots = dnp->dn_extra_slots + 1;
517	drro->drr_checksumtype = dnp->dn_checksum;
518	drro->drr_compress = dnp->dn_compress;
519	drro->drr_toguid = dsp->dsa_toguid;
520
521	if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
522	    drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
523		drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
524
525	bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8);
526
527	if ((dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW)) {
528		ASSERT(BP_IS_ENCRYPTED(bp));
529
530		if (BP_SHOULD_BYTESWAP(bp))
531			drro->drr_flags |= DRR_RAW_BYTESWAP;
532
533		/* needed for reconstructing dnp on recv side */
534		drro->drr_maxblkid = dnp->dn_maxblkid;
535		drro->drr_indblkshift = dnp->dn_indblkshift;
536		drro->drr_nlevels = dnp->dn_nlevels;
537		drro->drr_nblkptr = dnp->dn_nblkptr;
538
539		/*
540		 * Since we encrypt the entire bonus area, the (raw) part
541		 * beyond the bonuslen is actually nonzero, so we need
542		 * to send it.
543		 */
544		if (bonuslen != 0) {
545			drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);
546			bonuslen = drro->drr_raw_bonuslen;
547		}
548	}
549
550	/*
551	 * DRR_OBJECT_SPILL is set for every dnode which references a
552	 * spill block.  This allows the receiving pool to definitively
553	 * determine when a spill block should be kept or freed.
554	 */
555	if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR)
556		drro->drr_flags |= DRR_OBJECT_SPILL;
557
558	if (dump_record(dsp, DN_BONUS(dnp), bonuslen) != 0)
559		return (SET_ERROR(EINTR));
560
561	/* Free anything past the end of the file. */
562	if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
563	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0)
564		return (SET_ERROR(EINTR));
565
566	/*
567	 * Send DRR_SPILL records for unmodified spill blocks.  This is useful
568	 * because changing certain attributes of the object (e.g. blocksize)
569	 * can cause old versions of ZFS to incorrectly remove a spill block.
570	 * Including these records in the stream forces an up to date version
571	 * to always be written ensuring they're never lost.  Current versions
572	 * of the code which understand the DRR_FLAG_SPILL_BLOCK feature can
573	 * ignore these unmodified spill blocks.
574	 */
575	if (zfs_send_unmodified_spill_blocks &&
576	    (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) &&
577	    (DN_SPILL_BLKPTR(dnp)->blk_birth <= dsp->dsa_fromtxg)) {
578		struct send_block_record record;
579
580		bzero(&record, sizeof (struct send_block_record));
581		record.eos_marker = B_FALSE;
582		record.bp = *DN_SPILL_BLKPTR(dnp);
583		SET_BOOKMARK(&(record.zb), dmu_objset_id(dsp->dsa_os),
584		    object, 0, DMU_SPILL_BLKID);
585
586		if (do_dump(dsp, &record) != 0)
587			return (SET_ERROR(EINTR));
588	}
589
590	if (dsp->dsa_err != 0)
591		return (SET_ERROR(EINTR));
592	return (0);
593}
594
595static int
596dump_object_range(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t firstobj,
597    uint64_t numslots)
598{
599	struct drr_object_range *drror =
600	    &(dsp->dsa_drr->drr_u.drr_object_range);
601
602	/* we only use this record type for raw sends */
603	ASSERT(BP_IS_PROTECTED(bp));
604	ASSERT(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW);
605	ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
606	ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE);
607	ASSERT0(BP_GET_LEVEL(bp));
608
609	if (dsp->dsa_pending_op != PENDING_NONE) {
610		if (dump_record(dsp, NULL, 0) != 0)
611			return (SET_ERROR(EINTR));
612		dsp->dsa_pending_op = PENDING_NONE;
613	}
614
615	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
616	dsp->dsa_drr->drr_type = DRR_OBJECT_RANGE;
617	drror->drr_firstobj = firstobj;
618	drror->drr_numslots = numslots;
619	drror->drr_toguid = dsp->dsa_toguid;
620	if (BP_SHOULD_BYTESWAP(bp))
621		drror->drr_flags |= DRR_RAW_BYTESWAP;
622	zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv);
623	zio_crypt_decode_mac_bp(bp, drror->drr_mac);
624
625	if (dump_record(dsp, NULL, 0) != 0)
626		return (SET_ERROR(EINTR));
627	return (0);
628}
629
630static boolean_t
631backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
632{
633	if (!BP_IS_EMBEDDED(bp))
634		return (B_FALSE);
635
636	/*
637	 * Compression function must be legacy, or explicitly enabled.
638	 */
639	if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
640	    !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LZ4)))
641		return (B_FALSE);
642
643	/*
644	 * Embed type must be explicitly enabled.
645	 */
646	switch (BPE_GET_ETYPE(bp)) {
647	case BP_EMBEDDED_TYPE_DATA:
648		if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
649			return (B_TRUE);
650		break;
651	default:
652		return (B_FALSE);
653	}
654	return (B_FALSE);
655}
656
657/*
658 * This is the callback function to traverse_dataset that acts as the worker
659 * thread for dmu_send_impl.
660 */
661/*ARGSUSED*/
662static int
663send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
664    const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
665{
666	struct send_thread_arg *sta = arg;
667	struct send_block_record *record;
668	uint64_t record_size;
669	int err = 0;
670
671	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
672	    zb->zb_object >= sta->resume.zb_object);
673	ASSERT3P(sta->ds, !=, NULL);
674
675	if (sta->cancel)
676		return (SET_ERROR(EINTR));
677
678	if (bp == NULL) {
679		ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
680		return (0);
681	} else if (zb->zb_level < 0) {
682		return (0);
683	}
684
685	record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
686	record->eos_marker = B_FALSE;
687	record->bp = *bp;
688	record->zb = *zb;
689	record->indblkshift = dnp->dn_indblkshift;
690	record->datablkszsec = dnp->dn_datablkszsec;
691	record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
692	bqueue_enqueue(&sta->q, record, record_size);
693
694	return (err);
695}
696
697/*
698 * This function kicks off the traverse_dataset.  It also handles setting the
699 * error code of the thread in case something goes wrong, and pushes the End of
700 * Stream record when the traverse_dataset call has finished.  If there is no
701 * dataset to traverse, the thread immediately pushes End of Stream marker.
702 */
703static void
704send_traverse_thread(void *arg)
705{
706	struct send_thread_arg *st_arg = arg;
707	int err;
708	struct send_block_record *data;
709
710	if (st_arg->ds != NULL) {
711		err = traverse_dataset_resume(st_arg->ds,
712		    st_arg->fromtxg, &st_arg->resume,
713		    st_arg->flags, send_cb, st_arg);
714
715		if (err != EINTR)
716			st_arg->error_code = err;
717	}
718	data = kmem_zalloc(sizeof (*data), KM_SLEEP);
719	data->eos_marker = B_TRUE;
720	bqueue_enqueue(&st_arg->q, data, 1);
721	thread_exit();
722}
723
724/*
725 * This function actually handles figuring out what kind of record needs to be
726 * dumped, reading the data (which has hopefully been prefetched), and calling
727 * the appropriate helper function.
728 */
729static int
730do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
731{
732	dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
733	const blkptr_t *bp = &data->bp;
734	const zbookmark_phys_t *zb = &data->zb;
735	uint8_t indblkshift = data->indblkshift;
736	uint16_t dblkszsec = data->datablkszsec;
737	spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
738	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
739	int err = 0;
740
741	ASSERT3U(zb->zb_level, >=, 0);
742
743	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
744	    zb->zb_object >= dsa->dsa_resume_object);
745
746	/*
747	 * All bps of an encrypted os should have the encryption bit set.
748	 * If this is not true it indicates tampering and we report an error.
749	 */
750	if (dsa->dsa_os->os_encrypted &&
751	    !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
752		spa_log_error(spa, zb);
753		zfs_panic_recover("unencrypted block in encrypted "
754		    "object set %llu", ds->ds_object);
755		return (SET_ERROR(EIO));
756	}
757
758	if (zb->zb_object != DMU_META_DNODE_OBJECT &&
759	    DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
760		return (0);
761	} else if (BP_IS_HOLE(bp) &&
762	    zb->zb_object == DMU_META_DNODE_OBJECT) {
763		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
764		uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
765		err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
766	} else if (BP_IS_HOLE(bp)) {
767		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
768		uint64_t offset = zb->zb_blkid * span;
769		/* Don't dump free records for offsets > DMU_OBJECT_END */
770		if (zb->zb_blkid == 0 || span <= DMU_OBJECT_END / zb->zb_blkid)
771			err = dump_free(dsa, zb->zb_object, offset, span);
772	} else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
773		return (0);
774	} else if (type == DMU_OT_DNODE) {
775		int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
776		arc_flags_t aflags = ARC_FLAG_WAIT;
777		arc_buf_t *abuf;
778		enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
779
780		if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
781			ASSERT(BP_IS_ENCRYPTED(bp));
782			ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
783			zioflags |= ZIO_FLAG_RAW;
784		}
785
786		ASSERT0(zb->zb_level);
787
788		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
789		    ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0)
790			return (SET_ERROR(EIO));
791
792		dnode_phys_t *blk = abuf->b_data;
793		uint64_t dnobj = zb->zb_blkid * epb;
794
795		/*
796		 * Raw sends require sending encryption parameters for the
797		 * block of dnodes. Regular sends do not need to send this
798		 * info.
799		 */
800		if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
801			ASSERT(arc_is_encrypted(abuf));
802			err = dump_object_range(dsa, bp, dnobj, epb);
803		}
804
805		if (err == 0) {
806			for (int i = 0; i < epb;
807			    i += blk[i].dn_extra_slots + 1) {
808				err = dump_dnode(dsa, bp, dnobj + i, blk + i);
809				if (err != 0)
810					break;
811			}
812		}
813		arc_buf_destroy(abuf, &abuf);
814	} else if (type == DMU_OT_SA) {
815		arc_flags_t aflags = ARC_FLAG_WAIT;
816		arc_buf_t *abuf;
817		enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
818
819		if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
820			ASSERT(BP_IS_PROTECTED(bp));
821			zioflags |= ZIO_FLAG_RAW;
822		}
823
824		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
825		    ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0)
826			return (SET_ERROR(EIO));
827
828		err = dump_spill(dsa, bp, zb->zb_object, abuf->b_data);
829		arc_buf_destroy(abuf, &abuf);
830	} else if (backup_do_embed(dsa, bp)) {
831		/* it's an embedded level-0 block of a regular object */
832		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
833		ASSERT0(zb->zb_level);
834		err = dump_write_embedded(dsa, zb->zb_object,
835		    zb->zb_blkid * blksz, blksz, bp);
836	} else {
837		/* it's a level-0 block of a regular object */
838		arc_flags_t aflags = ARC_FLAG_WAIT;
839		arc_buf_t *abuf;
840		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
841		uint64_t offset;
842
843		/*
844		 * If we have large blocks stored on disk but the send flags
845		 * don't allow us to send large blocks, we split the data from
846		 * the arc buf into chunks.
847		 */
848		boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE &&
849		    !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
850
851		/*
852		 * Raw sends require that we always get raw data as it exists
853		 * on disk, so we assert that we are not splitting blocks here.
854		 */
855		boolean_t request_raw =
856		    (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
857
858		/*
859		 * We should only request compressed data from the ARC if all
860		 * the following are true:
861		 *  - stream compression was requested
862		 *  - we aren't splitting large blocks into smaller chunks
863		 *  - the data won't need to be byteswapped before sending
864		 *  - this isn't an embedded block
865		 *  - this isn't metadata (if receiving on a different endian
866		 *    system it can be byteswapped more easily)
867		 */
868		boolean_t request_compressed =
869		    (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
870		    !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
871		    !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
872
873		IMPLY(request_raw, !split_large_blocks);
874		IMPLY(request_raw, BP_IS_PROTECTED(bp));
875		ASSERT0(zb->zb_level);
876		ASSERT(zb->zb_object > dsa->dsa_resume_object ||
877		    (zb->zb_object == dsa->dsa_resume_object &&
878		    zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
879
880		ASSERT0(zb->zb_level);
881		ASSERT(zb->zb_object > dsa->dsa_resume_object ||
882		    (zb->zb_object == dsa->dsa_resume_object &&
883		    zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
884
885		ASSERT3U(blksz, ==, BP_GET_LSIZE(bp));
886
887		enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
888		if (request_raw)
889			zioflags |= ZIO_FLAG_RAW;
890		else if (request_compressed)
891			zioflags |= ZIO_FLAG_RAW_COMPRESS;
892
893		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
894		    ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) {
895			if (zfs_send_corrupt_data) {
896				/* Send a block filled with 0x"zfs badd bloc" */
897				abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
898				    blksz);
899				uint64_t *ptr;
900				for (ptr = abuf->b_data;
901				    (char *)ptr < (char *)abuf->b_data + blksz;
902				    ptr++)
903					*ptr = 0x2f5baddb10cULL;
904			} else {
905				return (SET_ERROR(EIO));
906			}
907		}
908
909		offset = zb->zb_blkid * blksz;
910
911		if (split_large_blocks) {
912			ASSERT0(arc_is_encrypted(abuf));
913			ASSERT3U(arc_get_compression(abuf), ==,
914			    ZIO_COMPRESS_OFF);
915			char *buf = abuf->b_data;
916			while (blksz > 0 && err == 0) {
917				int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
918				err = dump_write(dsa, type, zb->zb_object,
919				    offset, n, n, NULL, buf);
920				offset += n;
921				buf += n;
922				blksz -= n;
923			}
924		} else {
925			err = dump_write(dsa, type, zb->zb_object, offset,
926			    blksz, arc_buf_size(abuf), bp, abuf->b_data);
927		}
928		arc_buf_destroy(abuf, &abuf);
929	}
930
931	ASSERT(err == 0 || err == EINTR);
932	return (err);
933}
934
935/*
936 * Pop the new data off the queue, and free the old data.
937 */
938static struct send_block_record *
939get_next_record(bqueue_t *bq, struct send_block_record *data)
940{
941	struct send_block_record *tmp = bqueue_dequeue(bq);
942	kmem_free(data, sizeof (*data));
943	return (tmp);
944}
945
946/*
947 * Actually do the bulk of the work in a zfs send.
948 *
949 * Note: Releases dp using the specified tag.
950 */
951static int
952dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
953    zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone,
954    boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
955    boolean_t rawok, int outfd, uint64_t resumeobj, uint64_t resumeoff,
956    vnode_t *vp, offset_t *off)
957{
958	objset_t *os;
959	dmu_replay_record_t *drr;
960	dmu_sendarg_t *dsp;
961	int err;
962	uint64_t fromtxg = 0;
963	uint64_t featureflags = 0;
964	struct send_thread_arg to_arg = { 0 };
965
966	err = dmu_objset_from_ds(to_ds, &os);
967	if (err != 0) {
968		dsl_pool_rele(dp, tag);
969		return (err);
970	}
971
972	/*
973	 * If this is a non-raw send of an encrypted ds, we can ensure that
974	 * the objset_phys_t is authenticated. This is safe because this is
975	 * either a snapshot or we have owned the dataset, ensuring that
976	 * it can't be modified.
977	 */
978	if (!rawok && os->os_encrypted &&
979	    arc_is_unauthenticated(os->os_phys_buf)) {
980		zbookmark_phys_t zb;
981
982		SET_BOOKMARK(&zb, to_ds->ds_object, ZB_ROOT_OBJECT,
983		    ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
984		err = arc_untransform(os->os_phys_buf, os->os_spa,
985		    &zb, B_FALSE);
986		if (err != 0) {
987			dsl_pool_rele(dp, tag);
988			return (err);
989		}
990
991		ASSERT0(arc_is_unauthenticated(os->os_phys_buf));
992	}
993
994	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
995	drr->drr_type = DRR_BEGIN;
996	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
997	DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
998	    DMU_SUBSTREAM);
999
1000#ifdef _KERNEL
1001	if (dmu_objset_type(os) == DMU_OST_ZFS) {
1002		uint64_t version;
1003		if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
1004			kmem_free(drr, sizeof (dmu_replay_record_t));
1005			dsl_pool_rele(dp, tag);
1006			return (SET_ERROR(EINVAL));
1007		}
1008		if (version >= ZPL_VERSION_SA) {
1009			featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
1010		}
1011	}
1012#endif
1013
1014	/* raw sends imply large_block_ok */
1015	if ((large_block_ok || rawok) &&
1016	    to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
1017		featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
1018	if (to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_DNODE])
1019		featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
1020
1021	/* encrypted datasets will not have embedded blocks */
1022	if ((embedok || rawok) && !os->os_encrypted &&
1023	    spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
1024		featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
1025	}
1026
1027	/* raw send implies compressok */
1028	if (compressok || rawok)
1029		featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
1030	if (rawok && os->os_encrypted)
1031		featureflags |= DMU_BACKUP_FEATURE_RAW;
1032
1033	if ((featureflags &
1034	    (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |
1035	    DMU_BACKUP_FEATURE_RAW)) != 0 &&
1036	    spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
1037		featureflags |= DMU_BACKUP_FEATURE_LZ4;
1038	}
1039
1040	if (resumeobj != 0 || resumeoff != 0) {
1041		featureflags |= DMU_BACKUP_FEATURE_RESUMING;
1042	}
1043
1044	DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
1045	    featureflags);
1046
1047	drr->drr_u.drr_begin.drr_creation_time =
1048	    dsl_dataset_phys(to_ds)->ds_creation_time;
1049	drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
1050	if (is_clone)
1051		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
1052	drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
1053	if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
1054		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
1055	if (zfs_send_set_freerecords_bit)
1056		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
1057
1058	drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;
1059
1060	if (ancestor_zb != NULL) {
1061		drr->drr_u.drr_begin.drr_fromguid =
1062		    ancestor_zb->zbm_guid;
1063		fromtxg = ancestor_zb->zbm_creation_txg;
1064	}
1065	dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
1066	if (!to_ds->ds_is_snapshot) {
1067		(void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
1068		    sizeof (drr->drr_u.drr_begin.drr_toname));
1069	}
1070
1071	dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
1072
1073	dsp->dsa_drr = drr;
1074	dsp->dsa_vp = vp;
1075	dsp->dsa_outfd = outfd;
1076	dsp->dsa_proc = curproc;
1077	dsp->dsa_os = os;
1078	dsp->dsa_off = off;
1079	dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
1080	dsp->dsa_fromtxg = fromtxg;
1081	dsp->dsa_pending_op = PENDING_NONE;
1082	dsp->dsa_featureflags = featureflags;
1083	dsp->dsa_resume_object = resumeobj;
1084	dsp->dsa_resume_offset = resumeoff;
1085
1086	mutex_enter(&to_ds->ds_sendstream_lock);
1087	list_insert_head(&to_ds->ds_sendstreams, dsp);
1088	mutex_exit(&to_ds->ds_sendstream_lock);
1089
1090	dsl_dataset_long_hold(to_ds, FTAG);
1091	dsl_pool_rele(dp, tag);
1092
1093	void *payload = NULL;
1094	size_t payload_len = 0;
1095	/* handle features that require a DRR_BEGIN payload */
1096	if (featureflags &
1097	    (DMU_BACKUP_FEATURE_RESUMING | DMU_BACKUP_FEATURE_RAW)) {
1098		nvlist_t *keynvl = NULL;
1099		nvlist_t *nvl = fnvlist_alloc();
1100
1101		if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
1102			dmu_object_info_t to_doi;
1103			err = dmu_object_info(os, resumeobj, &to_doi);
1104			if (err != 0) {
1105				fnvlist_free(nvl);
1106				goto out;
1107			}
1108
1109			SET_BOOKMARK(&to_arg.resume, to_ds->ds_object,
1110			    resumeobj, 0,
1111			    resumeoff / to_doi.doi_data_block_size);
1112
1113			fnvlist_add_uint64(nvl, "resume_object", resumeobj);
1114			fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
1115		}
1116
1117		if (featureflags & DMU_BACKUP_FEATURE_RAW) {
1118			uint64_t ivset_guid = (ancestor_zb != NULL) ?
1119			    ancestor_zb->zbm_ivset_guid : 0;
1120
1121			ASSERT(os->os_encrypted);
1122
1123			err = dsl_crypto_populate_key_nvlist(to_ds,
1124			    ivset_guid, &keynvl);
1125			if (err != 0) {
1126				fnvlist_free(nvl);
1127				goto out;
1128			}
1129
1130			fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);
1131		}
1132
1133		payload = fnvlist_pack(nvl, &payload_len);
1134		drr->drr_payloadlen = payload_len;
1135		fnvlist_free(keynvl);
1136		fnvlist_free(nvl);
1137	}
1138
1139	err = dump_record(dsp, payload, payload_len);
1140	fnvlist_pack_free(payload, payload_len);
1141	if (err != 0) {
1142		err = dsp->dsa_err;
1143		goto out;
1144	}
1145
1146	err = bqueue_init(&to_arg.q,
1147	    MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
1148	    offsetof(struct send_block_record, ln));
1149	to_arg.error_code = 0;
1150	to_arg.cancel = B_FALSE;
1151	to_arg.ds = to_ds;
1152	to_arg.fromtxg = fromtxg;
1153	to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
1154	if (rawok)
1155		to_arg.flags |= TRAVERSE_NO_DECRYPT;
1156	(void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
1157	    TS_RUN, minclsyspri);
1158
1159	struct send_block_record *to_data;
1160	to_data = bqueue_dequeue(&to_arg.q);
1161
1162	while (!to_data->eos_marker && err == 0) {
1163		err = do_dump(dsp, to_data);
1164		to_data = get_next_record(&to_arg.q, to_data);
1165		if (issig(JUSTLOOKING) && issig(FORREAL))
1166			err = EINTR;
1167	}
1168
1169	if (err != 0) {
1170		to_arg.cancel = B_TRUE;
1171		while (!to_data->eos_marker) {
1172			to_data = get_next_record(&to_arg.q, to_data);
1173		}
1174	}
1175	kmem_free(to_data, sizeof (*to_data));
1176
1177	bqueue_destroy(&to_arg.q);
1178
1179	if (err == 0 && to_arg.error_code != 0)
1180		err = to_arg.error_code;
1181
1182	if (err != 0)
1183		goto out;
1184
1185	if (dsp->dsa_pending_op != PENDING_NONE)
1186		if (dump_record(dsp, NULL, 0) != 0)
1187			err = SET_ERROR(EINTR);
1188
1189	if (err != 0) {
1190		if (err == EINTR && dsp->dsa_err != 0)
1191			err = dsp->dsa_err;
1192		goto out;
1193	}
1194
1195	bzero(drr, sizeof (dmu_replay_record_t));
1196	drr->drr_type = DRR_END;
1197	drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
1198	drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
1199
1200	if (dump_record(dsp, NULL, 0) != 0)
1201		err = dsp->dsa_err;
1202out:
1203	mutex_enter(&to_ds->ds_sendstream_lock);
1204	list_remove(&to_ds->ds_sendstreams, dsp);
1205	mutex_exit(&to_ds->ds_sendstream_lock);
1206
1207	VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end));
1208
1209	kmem_free(drr, sizeof (dmu_replay_record_t));
1210	kmem_free(dsp, sizeof (dmu_sendarg_t));
1211
1212	dsl_dataset_long_rele(to_ds, FTAG);
1213
1214	return (err);
1215}
1216
1217int
1218dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
1219    boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
1220    boolean_t rawok, int outfd, vnode_t *vp, offset_t *off)
1221{
1222	dsl_pool_t *dp;
1223	dsl_dataset_t *ds;
1224	dsl_dataset_t *fromds = NULL;
1225	ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
1226	int err;
1227
1228	err = dsl_pool_hold(pool, FTAG, &dp);
1229	if (err != 0)
1230		return (err);
1231
1232	err = dsl_dataset_hold_obj_flags(dp, tosnap, dsflags, FTAG, &ds);
1233	if (err != 0) {
1234		dsl_pool_rele(dp, FTAG);
1235		return (err);
1236	}
1237
1238	if (fromsnap != 0) {
1239		zfs_bookmark_phys_t zb = { 0 };
1240		boolean_t is_clone;
1241
1242		err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
1243		if (err != 0) {
1244			dsl_dataset_rele_flags(ds, dsflags, FTAG);
1245			dsl_pool_rele(dp, FTAG);
1246			return (err);
1247		}
1248		if (!dsl_dataset_is_before(ds, fromds, 0)) {
1249			err = SET_ERROR(EXDEV);
1250			dsl_dataset_rele(fromds, FTAG);
1251			dsl_dataset_rele_flags(ds, dsflags, FTAG);
1252			dsl_pool_rele(dp, FTAG);
1253			return (err);
1254		}
1255
1256		zb.zbm_creation_time =
1257		    dsl_dataset_phys(fromds)->ds_creation_time;
1258		zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
1259		zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1260
1261		if (dsl_dataset_is_zapified(fromds)) {
1262			(void) zap_lookup(dp->dp_meta_objset,
1263			    fromds->ds_object, DS_FIELD_IVSET_GUID, 8, 1,
1264			    &zb.zbm_ivset_guid);
1265		}
1266
1267		is_clone = (fromds->ds_dir != ds->ds_dir);
1268		dsl_dataset_rele(fromds, FTAG);
1269		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1270		    embedok, large_block_ok, compressok, rawok, outfd,
1271		    0, 0, vp, off);
1272	} else {
1273		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1274		    embedok, large_block_ok, compressok, rawok, outfd,
1275		    0, 0, vp, off);
1276	}
1277	dsl_dataset_rele_flags(ds, dsflags, FTAG);
1278	return (err);
1279}
1280
1281int
1282dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
1283    boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
1284    int outfd, uint64_t resumeobj, uint64_t resumeoff, vnode_t *vp,
1285    offset_t *off)
1286{
1287	dsl_pool_t *dp;
1288	dsl_dataset_t *ds;
1289	int err;
1290	ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
1291	boolean_t owned = B_FALSE;
1292
1293	if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
1294		return (SET_ERROR(EINVAL));
1295
1296	err = dsl_pool_hold(tosnap, FTAG, &dp);
1297	if (err != 0)
1298		return (err);
1299	if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
1300		/*
1301		 * We are sending a filesystem or volume.  Ensure
1302		 * that it doesn't change by owning the dataset.
1303		 */
1304		err = dsl_dataset_own(dp, tosnap, dsflags, FTAG, &ds);
1305		owned = B_TRUE;
1306	} else {
1307		err = dsl_dataset_hold_flags(dp, tosnap, dsflags, FTAG, &ds);
1308	}
1309	if (err != 0) {
1310		dsl_pool_rele(dp, FTAG);
1311		return (err);
1312	}
1313
1314	if (fromsnap != NULL) {
1315		zfs_bookmark_phys_t zb = { 0 };
1316		boolean_t is_clone = B_FALSE;
1317		int fsnamelen = strchr(tosnap, '@') - tosnap;
1318
1319		/*
1320		 * If the fromsnap is in a different filesystem, then
1321		 * mark the send stream as a clone.
1322		 */
1323		if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
1324		    (fromsnap[fsnamelen] != '@' &&
1325		    fromsnap[fsnamelen] != '#')) {
1326			is_clone = B_TRUE;
1327		}
1328
1329		if (strchr(fromsnap, '@')) {
1330			dsl_dataset_t *fromds;
1331			err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
1332			if (err == 0) {
1333				if (!dsl_dataset_is_before(ds, fromds, 0))
1334					err = SET_ERROR(EXDEV);
1335				zb.zbm_creation_time =
1336				    dsl_dataset_phys(fromds)->ds_creation_time;
1337				zb.zbm_creation_txg =
1338				    dsl_dataset_phys(fromds)->ds_creation_txg;
1339				zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1340				is_clone = (ds->ds_dir != fromds->ds_dir);
1341
1342				if (dsl_dataset_is_zapified(fromds)) {
1343					(void) zap_lookup(dp->dp_meta_objset,
1344					    fromds->ds_object,
1345					    DS_FIELD_IVSET_GUID, 8, 1,
1346					    &zb.zbm_ivset_guid);
1347				}
1348				dsl_dataset_rele(fromds, FTAG);
1349			}
1350		} else {
1351			err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1352		}
1353		if (err != 0) {
1354			if (owned)
1355				dsl_dataset_disown(ds, dsflags, FTAG);
1356			else
1357				dsl_dataset_rele_flags(ds, dsflags, FTAG);
1358
1359			dsl_pool_rele(dp, FTAG);
1360			return (err);
1361		}
1362		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1363		    embedok, large_block_ok, compressok, rawok,
1364		    outfd, resumeobj, resumeoff, vp, off);
1365	} else {
1366		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1367		    embedok, large_block_ok, compressok, rawok,
1368		    outfd, resumeobj, resumeoff, vp, off);
1369	}
1370	if (owned)
1371		dsl_dataset_disown(ds, dsflags, FTAG);
1372	else
1373		dsl_dataset_rele_flags(ds, dsflags, FTAG);
1374
1375	return (err);
1376}
1377
1378static int
1379dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
1380    uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
1381{
1382	int err = 0;
1383	uint64_t size;
1384	/*
1385	 * Assume that space (both on-disk and in-stream) is dominated by
1386	 * data.  We will adjust for indirect blocks and the copies property,
1387	 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1388	 */
1389	uint64_t recordsize;
1390	uint64_t record_count;
1391	objset_t *os;
1392	VERIFY0(dmu_objset_from_ds(ds, &os));
1393
1394	/* Assume all (uncompressed) blocks are recordsize. */
1395	if (zfs_override_estimate_recordsize != 0) {
1396		recordsize = zfs_override_estimate_recordsize;
1397	} else if (os->os_phys->os_type == DMU_OST_ZVOL) {
1398		err = dsl_prop_get_int_ds(ds,
1399		    zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);
1400	} else {
1401		err = dsl_prop_get_int_ds(ds,
1402		    zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);
1403	}
1404	if (err != 0)
1405		return (err);
1406	record_count = uncompressed / recordsize;
1407
1408	/*
1409	 * If we're estimating a send size for a compressed stream, use the
1410	 * compressed data size to estimate the stream size. Otherwise, use the
1411	 * uncompressed data size.
1412	 */
1413	size = stream_compressed ? compressed : uncompressed;
1414
1415	/*
1416	 * Subtract out approximate space used by indirect blocks.
1417	 * Assume most space is used by data blocks (non-indirect, non-dnode).
1418	 * Assume no ditto blocks or internal fragmentation.
1419	 *
1420	 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1421	 * block.
1422	 */
1423	size -= record_count * sizeof (blkptr_t);
1424
1425	/* Add in the space for the record associated with each block. */
1426	size += record_count * sizeof (dmu_replay_record_t);
1427
1428	*sizep = size;
1429
1430	return (0);
1431}
1432
1433int
1434dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds,
1435    boolean_t stream_compressed, uint64_t *sizep)
1436{
1437	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1438	int err;
1439	uint64_t uncomp, comp;
1440
1441	ASSERT(dsl_pool_config_held(dp));
1442
1443	/* tosnap must be a snapshot */
1444	if (!ds->ds_is_snapshot)
1445		return (SET_ERROR(EINVAL));
1446
1447	/* fromsnap, if provided, must be a snapshot */
1448	if (fromds != NULL && !fromds->ds_is_snapshot)
1449		return (SET_ERROR(EINVAL));
1450
1451	/*
1452	 * fromsnap must be an earlier snapshot from the same fs as tosnap,
1453	 * or the origin's fs.
1454	 */
1455	if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1456		return (SET_ERROR(EXDEV));
1457
1458	/* Get compressed and uncompressed size estimates of changed data. */
1459	if (fromds == NULL) {
1460		uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1461		comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
1462	} else {
1463		uint64_t used;
1464		err = dsl_dataset_space_written(fromds, ds,
1465		    &used, &comp, &uncomp);
1466		if (err != 0)
1467			return (err);
1468	}
1469
1470	err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
1471	    stream_compressed, sizep);
1472	/*
1473	 * Add the size of the BEGIN and END records to the estimate.
1474	 */
1475	*sizep += 2 * sizeof (dmu_replay_record_t);
1476	return (err);
1477}
1478
1479struct calculate_send_arg {
1480	uint64_t uncompressed;
1481	uint64_t compressed;
1482};
1483
1484/*
1485 * Simple callback used to traverse the blocks of a snapshot and sum their
1486 * uncompressed and compressed sizes.
1487 */
1488/* ARGSUSED */
1489static int
1490dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1491    const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1492{
1493	struct calculate_send_arg *space = arg;
1494	if (bp != NULL && !BP_IS_HOLE(bp)) {
1495		space->uncompressed += BP_GET_UCSIZE(bp);
1496		space->compressed += BP_GET_PSIZE(bp);
1497	}
1498	return (0);
1499}
1500
1501/*
1502 * Given a desination snapshot and a TXG, calculate the approximate size of a
1503 * send stream sent from that TXG. from_txg may be zero, indicating that the
1504 * whole snapshot will be sent.
1505 */
1506int
1507dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1508    boolean_t stream_compressed, uint64_t *sizep)
1509{
1510	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1511	int err;
1512	struct calculate_send_arg size = { 0 };
1513
1514	ASSERT(dsl_pool_config_held(dp));
1515
1516	/* tosnap must be a snapshot */
1517	if (!ds->ds_is_snapshot)
1518		return (SET_ERROR(EINVAL));
1519
1520	/* verify that from_txg is before the provided snapshot was taken */
1521	if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1522		return (SET_ERROR(EXDEV));
1523	}
1524
1525	/*
1526	 * traverse the blocks of the snapshot with birth times after
1527	 * from_txg, summing their uncompressed size
1528	 */
1529	err = traverse_dataset(ds, from_txg,
1530	    TRAVERSE_POST | TRAVERSE_NO_DECRYPT,
1531	    dmu_calculate_send_traversal, &size);
1532	if (err)
1533		return (err);
1534
1535	err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed,
1536	    size.compressed, stream_compressed, sizep);
1537	return (err);
1538}
1539