xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu.c (revision ea8dc4b6d2251b437950c0056bc626b311c73c27)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dmu_zfetch.h>
40 #include <sys/zfs_ioctl.h>
41 #include <sys/zap.h>
42 #include <sys/zio_checksum.h>
43 
44 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
45 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
46 	{	zap_byteswap,		TRUE,	"object directory"	},
47 	{	byteswap_uint64_array,	TRUE,	"object array"		},
48 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
49 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
50 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
51 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
52 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
53 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
54 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
55 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
56 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
57 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
58 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
59 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
60 	{	zap_byteswap,		TRUE,	"DSL props"		},
61 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
62 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
63 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
64 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
65 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
66 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
67 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
68 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
69 	{	zap_byteswap,		TRUE,	"zvol prop"		},
70 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
71 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
72 	{	zap_byteswap,		TRUE,	"other ZAP"		},
73 	{	zap_byteswap,		TRUE,	"persistent error log"	},
74 };
75 
76 int
77 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
78     void *tag, dmu_buf_t **dbp)
79 {
80 	dnode_t *dn;
81 	uint64_t blkid;
82 	dmu_buf_impl_t *db;
83 	int err;
84 
85 	/* dataset_verify(dd); */
86 
87 	err = dnode_hold(os->os, object, FTAG, &dn);
88 	if (err)
89 		return (err);
90 	blkid = dbuf_whichblock(dn, offset);
91 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
92 	db = dbuf_hold(dn, blkid, tag);
93 	rw_exit(&dn->dn_struct_rwlock);
94 	if (db == NULL) {
95 		err = EIO;
96 	} else {
97 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
98 		if (err) {
99 			dbuf_rele(db, tag);
100 			db = NULL;
101 		}
102 	}
103 
104 	dnode_rele(dn, FTAG);
105 	*dbp = &db->db;
106 	return (err);
107 }
108 
109 int
110 dmu_bonus_max(void)
111 {
112 	return (DN_MAX_BONUSLEN);
113 }
114 
115 /*
116  * returns ENOENT, EIO, or 0.
117  */
118 int
119 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
120 {
121 	dnode_t *dn;
122 	int err, count;
123 	dmu_buf_impl_t *db;
124 
125 	err = dnode_hold(os->os, object, FTAG, &dn);
126 	if (err)
127 		return (err);
128 
129 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
130 	if (dn->dn_bonus == NULL) {
131 		rw_exit(&dn->dn_struct_rwlock);
132 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
133 		if (dn->dn_bonus == NULL)
134 			dn->dn_bonus = dbuf_create_bonus(dn);
135 	}
136 	db = dn->dn_bonus;
137 	rw_exit(&dn->dn_struct_rwlock);
138 	mutex_enter(&db->db_mtx);
139 	count = refcount_add(&db->db_holds, tag);
140 	mutex_exit(&db->db_mtx);
141 	if (count == 1)
142 		dnode_add_ref(dn, db);
143 	dnode_rele(dn, FTAG);
144 
145 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
146 
147 	*dbp = &db->db;
148 	return (0);
149 }
150 
151 int
152 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
153     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
154 {
155 	dnode_t *dn;
156 	dmu_buf_t **dbp;
157 	uint64_t blkid, nblks, i;
158 	uint32_t flags;
159 	int err;
160 	zio_t *zio;
161 
162 	ASSERT(length <= DMU_MAX_ACCESS);
163 
164 	if (length == 0) {
165 		if (numbufsp)
166 			*numbufsp = 0;
167 		*dbpp = NULL;
168 		return (0);
169 	}
170 
171 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
172 	if (length >= zfetch_array_rd_sz)
173 		flags |= DB_RF_NOPREFETCH;
174 
175 	err = dnode_hold(os->os, object, FTAG, &dn);
176 	if (err)
177 		return (err);
178 
179 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
180 	if (dn->dn_datablkshift) {
181 		int blkshift = dn->dn_datablkshift;
182 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
183 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
184 	} else {
185 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
186 		nblks = 1;
187 	}
188 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
189 
190 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
191 	blkid = dbuf_whichblock(dn, offset);
192 	for (i = 0; i < nblks; i++) {
193 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
194 		if (db == NULL) {
195 			rw_exit(&dn->dn_struct_rwlock);
196 			dmu_buf_rele_array(dbp, nblks, tag);
197 			dnode_rele(dn, FTAG);
198 			zio_nowait(zio);
199 			return (EIO);
200 		}
201 		/* initiate async i/o */
202 		if (read && db->db_state == DB_UNCACHED) {
203 			rw_exit(&dn->dn_struct_rwlock);
204 			(void) dbuf_read(db, zio, flags);
205 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
206 		}
207 		dbp[i] = &db->db;
208 	}
209 	rw_exit(&dn->dn_struct_rwlock);
210 	dnode_rele(dn, FTAG);
211 
212 	/* wait for async i/o */
213 	err = zio_wait(zio);
214 	if (err) {
215 		dmu_buf_rele_array(dbp, nblks, tag);
216 		return (err);
217 	}
218 
219 	/* wait for other io to complete */
220 	if (read) {
221 		for (i = 0; i < nblks; i++) {
222 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
223 			mutex_enter(&db->db_mtx);
224 			while (db->db_state == DB_READ ||
225 			    db->db_state == DB_FILL)
226 				cv_wait(&db->db_changed, &db->db_mtx);
227 			if (db->db_state == DB_UNCACHED)
228 				err = EIO;
229 			mutex_exit(&db->db_mtx);
230 			if (err) {
231 				dmu_buf_rele_array(dbp, nblks, tag);
232 				return (err);
233 			}
234 		}
235 	}
236 
237 	*numbufsp = nblks;
238 	*dbpp = dbp;
239 	return (0);
240 }
241 
242 void
243 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
244 {
245 	int i;
246 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
247 
248 	if (numbufs == 0)
249 		return;
250 
251 	for (i = 0; i < numbufs; i++) {
252 		if (dbp[i])
253 			dbuf_rele(dbp[i], tag);
254 	}
255 
256 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
257 }
258 
259 void
260 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
261 {
262 	dnode_t *dn;
263 	uint64_t blkid;
264 	int nblks, i, err;
265 
266 	if (len == 0) {  /* they're interested in the bonus buffer */
267 		dn = os->os->os_meta_dnode;
268 
269 		if (object == 0 || object >= DN_MAX_OBJECT)
270 			return;
271 
272 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
273 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
274 		dbuf_prefetch(dn, blkid);
275 		rw_exit(&dn->dn_struct_rwlock);
276 		return;
277 	}
278 
279 	/*
280 	 * XXX - Note, if the dnode for the requested object is not
281 	 * already cached, we will do a *synchronous* read in the
282 	 * dnode_hold() call.  The same is true for any indirects.
283 	 */
284 	err = dnode_hold(os->os, object, FTAG, &dn);
285 	if (err != 0)
286 		return;
287 
288 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
289 	if (dn->dn_datablkshift) {
290 		int blkshift = dn->dn_datablkshift;
291 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
292 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
293 	} else {
294 		nblks = (offset < dn->dn_datablksz);
295 	}
296 
297 	if (nblks != 0) {
298 		blkid = dbuf_whichblock(dn, offset);
299 		for (i = 0; i < nblks; i++)
300 			dbuf_prefetch(dn, blkid+i);
301 	}
302 
303 	rw_exit(&dn->dn_struct_rwlock);
304 
305 	dnode_rele(dn, FTAG);
306 }
307 
308 int
309 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
310     uint64_t size, dmu_tx_t *tx)
311 {
312 	dnode_t *dn;
313 	int err = dnode_hold(os->os, object, FTAG, &dn);
314 	if (err)
315 		return (err);
316 	ASSERT(offset < UINT64_MAX);
317 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
318 	dnode_free_range(dn, offset, size, tx);
319 	dnode_rele(dn, FTAG);
320 	return (0);
321 }
322 
323 int
324 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
325     void *buf)
326 {
327 	dnode_t *dn;
328 	dmu_buf_t **dbp;
329 	int numbufs, i, err;
330 
331 	/*
332 	 * Deal with odd block sizes, where there can't be data past the
333 	 * first block.
334 	 */
335 	err = dnode_hold(os->os, object, FTAG, &dn);
336 	if (err)
337 		return (err);
338 	if (dn->dn_datablkshift == 0) {
339 		int newsz = offset > dn->dn_datablksz ? 0 :
340 		    MIN(size, dn->dn_datablksz - offset);
341 		bzero((char *)buf + newsz, size - newsz);
342 		size = newsz;
343 	}
344 	dnode_rele(dn, FTAG);
345 
346 	while (size > 0) {
347 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
348 		int err;
349 
350 		/*
351 		 * NB: we could do this block-at-a-time, but it's nice
352 		 * to be reading in parallel.
353 		 */
354 		err = dmu_buf_hold_array(os, object, offset, mylen,
355 		    TRUE, FTAG, &numbufs, &dbp);
356 		if (err)
357 			return (err);
358 
359 		for (i = 0; i < numbufs; i++) {
360 			int tocpy;
361 			int bufoff;
362 			dmu_buf_t *db = dbp[i];
363 
364 			ASSERT(size > 0);
365 
366 			bufoff = offset - db->db_offset;
367 			tocpy = (int)MIN(db->db_size - bufoff, size);
368 
369 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
370 
371 			offset += tocpy;
372 			size -= tocpy;
373 			buf = (char *)buf + tocpy;
374 		}
375 		dmu_buf_rele_array(dbp, numbufs, FTAG);
376 	}
377 	return (0);
378 }
379 
380 void
381 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
382     const void *buf, dmu_tx_t *tx)
383 {
384 	dmu_buf_t **dbp;
385 	int numbufs, i;
386 
387 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
388 	    FALSE, FTAG, &numbufs, &dbp));
389 
390 	for (i = 0; i < numbufs; i++) {
391 		int tocpy;
392 		int bufoff;
393 		dmu_buf_t *db = dbp[i];
394 
395 		ASSERT(size > 0);
396 
397 		bufoff = offset - db->db_offset;
398 		tocpy = (int)MIN(db->db_size - bufoff, size);
399 
400 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
401 
402 		if (tocpy == db->db_size)
403 			dmu_buf_will_fill(db, tx);
404 		else
405 			dmu_buf_will_dirty(db, tx);
406 
407 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
408 
409 		if (tocpy == db->db_size)
410 			dmu_buf_fill_done(db, tx);
411 
412 		offset += tocpy;
413 		size -= tocpy;
414 		buf = (char *)buf + tocpy;
415 	}
416 	dmu_buf_rele_array(dbp, numbufs, FTAG);
417 }
418 
419 #ifdef _KERNEL
420 int
421 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
422     uio_t *uio, dmu_tx_t *tx)
423 {
424 	dmu_buf_t **dbp;
425 	int numbufs, i;
426 	int err = 0;
427 
428 	err = dmu_buf_hold_array(os, object, offset, size,
429 	    FALSE, FTAG, &numbufs, &dbp);
430 	if (err)
431 		return (err);
432 
433 	for (i = 0; i < numbufs; i++) {
434 		int tocpy;
435 		int bufoff;
436 		dmu_buf_t *db = dbp[i];
437 
438 		ASSERT(size > 0);
439 
440 		bufoff = offset - db->db_offset;
441 		tocpy = (int)MIN(db->db_size - bufoff, size);
442 
443 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
444 
445 		if (tocpy == db->db_size)
446 			dmu_buf_will_fill(db, tx);
447 		else
448 			dmu_buf_will_dirty(db, tx);
449 
450 		/*
451 		 * XXX uiomove could block forever (eg. nfs-backed
452 		 * pages).  There needs to be a uiolockdown() function
453 		 * to lock the pages in memory, so that uiomove won't
454 		 * block.
455 		 */
456 		err = uiomove((char *)db->db_data + bufoff, tocpy,
457 		    UIO_WRITE, uio);
458 
459 		if (tocpy == db->db_size)
460 			dmu_buf_fill_done(db, tx);
461 
462 		if (err)
463 			break;
464 
465 		offset += tocpy;
466 		size -= tocpy;
467 	}
468 	dmu_buf_rele_array(dbp, numbufs, FTAG);
469 	return (err);
470 }
471 #endif
472 
473 struct backuparg {
474 	dmu_replay_record_t *drr;
475 	vnode_t *vp;
476 	objset_t *os;
477 	zio_cksum_t zc;
478 	int err;
479 };
480 
481 static int
482 dump_bytes(struct backuparg *ba, void *buf, int len)
483 {
484 	ssize_t resid; /* have to get resid to get detailed errno */
485 	ASSERT3U(len % 8, ==, 0);
486 
487 	fletcher_4_incremental_native(buf, len, &ba->zc);
488 	ba->err = vn_rdwr(UIO_WRITE, ba->vp,
489 	    (caddr_t)buf, len,
490 	    0, UIO_SYSSPACE, FAPPEND, RLIM_INFINITY, CRED(), &resid);
491 	return (ba->err);
492 }
493 
494 static int
495 dump_free(struct backuparg *ba, uint64_t object, uint64_t offset,
496     uint64_t length)
497 {
498 	/* write a FREE record */
499 	bzero(ba->drr, sizeof (dmu_replay_record_t));
500 	ba->drr->drr_type = DRR_FREE;
501 	ba->drr->drr_u.drr_free.drr_object = object;
502 	ba->drr->drr_u.drr_free.drr_offset = offset;
503 	ba->drr->drr_u.drr_free.drr_length = length;
504 
505 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
506 		return (EINTR);
507 	return (0);
508 }
509 
510 static int
511 dump_data(struct backuparg *ba, dmu_object_type_t type,
512     uint64_t object, uint64_t offset, int blksz, void *data)
513 {
514 	/* write a DATA record */
515 	bzero(ba->drr, sizeof (dmu_replay_record_t));
516 	ba->drr->drr_type = DRR_WRITE;
517 	ba->drr->drr_u.drr_write.drr_object = object;
518 	ba->drr->drr_u.drr_write.drr_type = type;
519 	ba->drr->drr_u.drr_write.drr_offset = offset;
520 	ba->drr->drr_u.drr_write.drr_length = blksz;
521 
522 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
523 		return (EINTR);
524 	if (dump_bytes(ba, data, blksz))
525 		return (EINTR);
526 	return (0);
527 }
528 
529 static int
530 dump_freeobjects(struct backuparg *ba, uint64_t firstobj, uint64_t numobjs)
531 {
532 	/* write a FREEOBJECTS record */
533 	bzero(ba->drr, sizeof (dmu_replay_record_t));
534 	ba->drr->drr_type = DRR_FREEOBJECTS;
535 	ba->drr->drr_u.drr_freeobjects.drr_firstobj = firstobj;
536 	ba->drr->drr_u.drr_freeobjects.drr_numobjs = numobjs;
537 
538 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
539 		return (EINTR);
540 	return (0);
541 }
542 
543 static int
544 dump_dnode(struct backuparg *ba, uint64_t object, dnode_phys_t *dnp)
545 {
546 	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
547 		return (dump_freeobjects(ba, object, 1));
548 
549 	/* write an OBJECT record */
550 	bzero(ba->drr, sizeof (dmu_replay_record_t));
551 	ba->drr->drr_type = DRR_OBJECT;
552 	ba->drr->drr_u.drr_object.drr_object = object;
553 	ba->drr->drr_u.drr_object.drr_type = dnp->dn_type;
554 	ba->drr->drr_u.drr_object.drr_bonustype = dnp->dn_bonustype;
555 	ba->drr->drr_u.drr_object.drr_blksz =
556 	    dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
557 	ba->drr->drr_u.drr_object.drr_bonuslen = dnp->dn_bonuslen;
558 	ba->drr->drr_u.drr_object.drr_checksum = dnp->dn_checksum;
559 	ba->drr->drr_u.drr_object.drr_compress = dnp->dn_compress;
560 
561 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
562 		return (EINTR);
563 
564 	if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)))
565 		return (EINTR);
566 
567 	/* free anything past the end of the file */
568 	if (dump_free(ba, object, (dnp->dn_maxblkid + 1) *
569 	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL))
570 		return (EINTR);
571 	if (ba->err)
572 		return (EINTR);
573 	return (0);
574 }
575 
576 #define	BP_SPAN(dnp, level) \
577 	(((uint64_t)dnp->dn_datablkszsec) << (SPA_MINBLOCKSHIFT + \
578 	(level) * (dnp->dn_indblkshift - SPA_BLKPTRSHIFT)))
579 
580 static int
581 backup_cb(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
582 {
583 	struct backuparg *ba = arg;
584 	uint64_t object = bc->bc_bookmark.zb_object;
585 	int level = bc->bc_bookmark.zb_level;
586 	uint64_t blkid = bc->bc_bookmark.zb_blkid;
587 	blkptr_t *bp = bc->bc_blkptr.blk_birth ? &bc->bc_blkptr : NULL;
588 	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
589 	void *data = bc->bc_data;
590 	int err = 0;
591 
592 	if (issig(JUSTLOOKING) && issig(FORREAL))
593 		return (EINTR);
594 
595 	ASSERT(data || bp == NULL);
596 
597 	if (bp == NULL && object == 0) {
598 		uint64_t span = BP_SPAN(bc->bc_dnode, level);
599 		uint64_t dnobj = (blkid * span) >> DNODE_SHIFT;
600 		err = dump_freeobjects(ba, dnobj, span >> DNODE_SHIFT);
601 	} else if (bp == NULL) {
602 		uint64_t span = BP_SPAN(bc->bc_dnode, level);
603 		err = dump_free(ba, object, blkid * span, span);
604 	} else if (data && level == 0 && type == DMU_OT_DNODE) {
605 		dnode_phys_t *blk = data;
606 		int i;
607 		int blksz = BP_GET_LSIZE(bp);
608 
609 		for (i = 0; i < blksz >> DNODE_SHIFT; i++) {
610 			uint64_t dnobj =
611 			    (blkid << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)) + i;
612 			err = dump_dnode(ba, dnobj, blk+i);
613 			if (err)
614 				break;
615 		}
616 	} else if (level == 0 &&
617 	    type != DMU_OT_DNODE && type != DMU_OT_OBJSET) {
618 		int blksz = BP_GET_LSIZE(bp);
619 		if (data == NULL) {
620 			arc_buf_t *abuf;
621 			zbookmark_t zb;
622 
623 			zb.zb_objset = ba->os->os->os_dsl_dataset->ds_object;
624 			zb.zb_object = object;
625 			zb.zb_level = level;
626 			zb.zb_blkid = blkid;
627 			(void) arc_read(NULL, spa, bp,
628 			    dmu_ot[type].ot_byteswap, arc_getbuf_func, &abuf,
629 			    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_MUSTSUCCEED,
630 			    ARC_WAIT, &zb);
631 
632 			if (abuf) {
633 				err = dump_data(ba, type, object, blkid * blksz,
634 				    blksz, abuf->b_data);
635 				(void) arc_buf_remove_ref(abuf, &abuf);
636 			}
637 		} else {
638 			err = dump_data(ba, type, object, blkid * blksz,
639 			    blksz, data);
640 		}
641 	}
642 
643 	ASSERT(err == 0 || err == EINTR);
644 	return (err);
645 }
646 
647 int
648 dmu_sendbackup(objset_t *tosnap, objset_t *fromsnap, vnode_t *vp)
649 {
650 	dsl_dataset_t *ds = tosnap->os->os_dsl_dataset;
651 	dsl_dataset_t *fromds = fromsnap ? fromsnap->os->os_dsl_dataset : NULL;
652 	dmu_replay_record_t *drr;
653 	struct backuparg ba;
654 	int err;
655 
656 	/* tosnap must be a snapshot */
657 	if (ds->ds_phys->ds_next_snap_obj == 0)
658 		return (EINVAL);
659 
660 	/* fromsnap must be an earlier snapshot from the same fs as tosnap */
661 	if (fromds && (ds->ds_dir != fromds->ds_dir ||
662 	    fromds->ds_phys->ds_creation_txg >=
663 	    ds->ds_phys->ds_creation_txg))
664 		return (EXDEV);
665 
666 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
667 	drr->drr_type = DRR_BEGIN;
668 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
669 	drr->drr_u.drr_begin.drr_version = DMU_BACKUP_VERSION;
670 	drr->drr_u.drr_begin.drr_creation_time =
671 	    ds->ds_phys->ds_creation_time;
672 	drr->drr_u.drr_begin.drr_type = tosnap->os->os_phys->os_type;
673 	drr->drr_u.drr_begin.drr_toguid = ds->ds_phys->ds_guid;
674 	if (fromds)
675 		drr->drr_u.drr_begin.drr_fromguid = fromds->ds_phys->ds_guid;
676 	dsl_dataset_name(ds, drr->drr_u.drr_begin.drr_toname);
677 
678 	ba.drr = drr;
679 	ba.vp = vp;
680 	ba.os = tosnap;
681 	ZIO_SET_CHECKSUM(&ba.zc, 0, 0, 0, 0);
682 
683 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
684 		kmem_free(drr, sizeof (dmu_replay_record_t));
685 		return (ba.err);
686 	}
687 
688 	err = traverse_dsl_dataset(ds,
689 	    fromds ? fromds->ds_phys->ds_creation_txg : 0,
690 	    ADVANCE_PRE | ADVANCE_HOLES | ADVANCE_DATA | ADVANCE_NOLOCK,
691 	    backup_cb, &ba);
692 
693 	if (err) {
694 		if (err == EINTR && ba.err)
695 			err = ba.err;
696 		return (err);
697 	}
698 
699 	bzero(drr, sizeof (dmu_replay_record_t));
700 	drr->drr_type = DRR_END;
701 	drr->drr_u.drr_end.drr_checksum = ba.zc;
702 
703 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)))
704 		return (ba.err);
705 
706 	kmem_free(drr, sizeof (dmu_replay_record_t));
707 
708 	return (0);
709 }
710 
711 struct restorearg {
712 	int err;
713 	int byteswap;
714 	vnode_t *vp;
715 	char *buf;
716 	uint64_t voff;
717 	int buflen; /* number of valid bytes in buf */
718 	int bufoff; /* next offset to read */
719 	int bufsize; /* amount of memory allocated for buf */
720 	zio_cksum_t zc;
721 };
722 
723 static int
724 replay_incremental_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
725 {
726 	struct drr_begin *drrb = arg;
727 	dsl_dataset_t *ds = NULL;
728 	dsl_dataset_t *ds_prev = NULL;
729 	const char *snapname;
730 	int err = EINVAL;
731 	uint64_t val;
732 
733 	/* this must be a filesytem */
734 	if (dd->dd_phys->dd_head_dataset_obj == 0)
735 		goto die;
736 
737 	err = dsl_dataset_open_obj(dd->dd_pool,
738 	    dd->dd_phys->dd_head_dataset_obj,
739 	    NULL, DS_MODE_EXCLUSIVE, FTAG, &ds);
740 	if (err)
741 		goto die;
742 
743 	if (ds == NULL) {
744 		err = EBUSY;
745 		goto die;
746 	}
747 
748 	/* must already be a snapshot of this fs */
749 	if (ds->ds_phys->ds_prev_snap_obj == 0) {
750 		err = ENODEV;
751 		goto die;
752 	}
753 
754 	/* most recent snapshot must match fromguid */
755 	err = dsl_dataset_open_obj(dd->dd_pool,
756 	    ds->ds_phys->ds_prev_snap_obj, NULL,
757 	    DS_MODE_STANDARD | DS_MODE_READONLY, FTAG, &ds_prev);
758 	if (err)
759 		goto die;
760 	if (ds_prev->ds_phys->ds_guid != drrb->drr_fromguid) {
761 		err = ENODEV;
762 		goto die;
763 	}
764 
765 	/* must not have any changes since most recent snapshot */
766 	if (ds->ds_phys->ds_bp.blk_birth >
767 	    ds_prev->ds_phys->ds_creation_txg) {
768 		err = ETXTBSY;
769 		goto die;
770 	}
771 
772 	/* new snapshot name must not exist */
773 	snapname = strrchr(drrb->drr_toname, '@');
774 	if (snapname == NULL) {
775 		err = EEXIST;
776 		goto die;
777 	}
778 	snapname++;
779 	err = zap_lookup(dd->dd_pool->dp_meta_objset,
780 	    ds->ds_phys->ds_snapnames_zapobj, snapname, 8, 1, &val);
781 	if (err != ENOENT) {
782 		if (err == 0)
783 			err = EEXIST;
784 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
785 		dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
786 		return (err);
787 	}
788 
789 	dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
790 
791 	/* The point of no (unsuccessful) return. */
792 
793 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
794 	ds->ds_phys->ds_restoring = TRUE;
795 
796 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
797 	return (0);
798 
799 die:
800 	if (ds_prev)
801 		dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
802 	if (ds)
803 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
804 	return (err);
805 }
806 
807 static int
808 replay_full_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
809 {
810 	struct drr_begin *drrb = arg;
811 	int err;
812 	char *fsfullname, *fslastname, *cp;
813 	dsl_dataset_t *ds;
814 
815 	fsfullname = kmem_alloc(MAXNAMELEN, KM_SLEEP);
816 	(void) strncpy(fsfullname, drrb->drr_toname, MAXNAMELEN);
817 	cp = strchr(fsfullname, '@');
818 	if (cp == NULL) {
819 		kmem_free(fsfullname, MAXNAMELEN);
820 		return (EINVAL);
821 	}
822 	*cp = '\0';
823 	fslastname = strrchr(fsfullname, '/');
824 	if (fslastname == NULL) {
825 		kmem_free(fsfullname, MAXNAMELEN);
826 		return (EINVAL);
827 	}
828 	fslastname++;
829 
830 	err = dsl_dataset_create_sync(dd, fsfullname, fslastname, NULL, tx);
831 	if (err) {
832 		kmem_free(fsfullname, MAXNAMELEN);
833 		return (err);
834 	}
835 
836 	/* the point of no (unsuccessful) return */
837 
838 	VERIFY(0 == dsl_dataset_open_spa(dd->dd_pool->dp_spa, fsfullname,
839 	    DS_MODE_EXCLUSIVE, FTAG, &ds));
840 	kmem_free(fsfullname, MAXNAMELEN);
841 
842 	(void) dmu_objset_create_impl(dsl_dataset_get_spa(ds),
843 	    ds, drrb->drr_type, tx);
844 
845 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
846 	ds->ds_phys->ds_restoring = TRUE;
847 
848 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
849 	return (0);
850 }
851 
852 static int
853 replay_end_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
854 {
855 	struct drr_begin *drrb = arg;
856 	int err;
857 	char *snapname;
858 	dsl_dataset_t *ds;
859 
860 	/* XXX verify that drr_toname is in dd */
861 
862 	snapname = strchr(drrb->drr_toname, '@');
863 	if (snapname == NULL)
864 		return (EINVAL);
865 	snapname++;
866 
867 	/* create snapshot */
868 	err = dsl_dataset_snapshot_sync(dd, snapname, tx);
869 	if (err)
870 		return (err);
871 
872 	/* set snapshot's creation time and guid */
873 	VERIFY(0 == dsl_dataset_open_spa(dd->dd_pool->dp_spa, drrb->drr_toname,
874 	    DS_MODE_PRIMARY | DS_MODE_READONLY | DS_MODE_RESTORE, FTAG, &ds));
875 
876 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
877 	ds->ds_phys->ds_creation_time = drrb->drr_creation_time;
878 	ds->ds_phys->ds_guid = drrb->drr_toguid;
879 	ds->ds_phys->ds_restoring = FALSE;
880 
881 	dsl_dataset_close(ds, DS_MODE_PRIMARY, FTAG);
882 
883 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
884 	    dd->dd_phys->dd_head_dataset_obj,
885 	    NULL, DS_MODE_STANDARD | DS_MODE_RESTORE, FTAG, &ds));
886 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
887 	ds->ds_phys->ds_restoring = FALSE;
888 	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
889 
890 	return (0);
891 }
892 
893 void *
894 restore_read(struct restorearg *ra, int len)
895 {
896 	void *rv;
897 
898 	/* some things will require 8-byte alignment, so everything must */
899 	ASSERT3U(len % 8, ==, 0);
900 
901 	while (ra->buflen - ra->bufoff < len) {
902 		ssize_t resid;
903 		int leftover = ra->buflen - ra->bufoff;
904 
905 		(void) memmove(ra->buf, ra->buf + ra->bufoff, leftover);
906 		ra->err = vn_rdwr(UIO_READ, ra->vp,
907 		    (caddr_t)ra->buf + leftover, ra->bufsize - leftover,
908 		    ra->voff, UIO_SYSSPACE, FAPPEND,
909 		    RLIM_INFINITY, CRED(), &resid);
910 
911 		ra->voff += ra->bufsize - leftover - resid;
912 		ra->buflen = ra->bufsize - resid;
913 		ra->bufoff = 0;
914 		if (resid == ra->bufsize - leftover)
915 			ra->err = EINVAL;
916 		if (ra->err)
917 			return (NULL);
918 		/* Could compute checksum here? */
919 	}
920 
921 	ASSERT3U(ra->bufoff % 8, ==, 0);
922 	ASSERT3U(ra->buflen - ra->bufoff, >=, len);
923 	rv = ra->buf + ra->bufoff;
924 	ra->bufoff += len;
925 	if (ra->byteswap)
926 		fletcher_4_incremental_byteswap(rv, len, &ra->zc);
927 	else
928 		fletcher_4_incremental_native(rv, len, &ra->zc);
929 	return (rv);
930 }
931 
932 static void
933 backup_byteswap(dmu_replay_record_t *drr)
934 {
935 #define	DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
936 #define	DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
937 	drr->drr_type = BSWAP_32(drr->drr_type);
938 	switch (drr->drr_type) {
939 	case DRR_BEGIN:
940 		DO64(drr_begin.drr_magic);
941 		DO64(drr_begin.drr_version);
942 		DO64(drr_begin.drr_creation_time);
943 		DO32(drr_begin.drr_type);
944 		DO64(drr_begin.drr_toguid);
945 		DO64(drr_begin.drr_fromguid);
946 		break;
947 	case DRR_OBJECT:
948 		DO64(drr_object.drr_object);
949 		/* DO64(drr_object.drr_allocation_txg); */
950 		DO32(drr_object.drr_type);
951 		DO32(drr_object.drr_bonustype);
952 		DO32(drr_object.drr_blksz);
953 		DO32(drr_object.drr_bonuslen);
954 		break;
955 	case DRR_FREEOBJECTS:
956 		DO64(drr_freeobjects.drr_firstobj);
957 		DO64(drr_freeobjects.drr_numobjs);
958 		break;
959 	case DRR_WRITE:
960 		DO64(drr_write.drr_object);
961 		DO32(drr_write.drr_type);
962 		DO64(drr_write.drr_offset);
963 		DO64(drr_write.drr_length);
964 		break;
965 	case DRR_FREE:
966 		DO64(drr_free.drr_object);
967 		DO64(drr_free.drr_offset);
968 		DO64(drr_free.drr_length);
969 		break;
970 	case DRR_END:
971 		DO64(drr_end.drr_checksum.zc_word[0]);
972 		DO64(drr_end.drr_checksum.zc_word[1]);
973 		DO64(drr_end.drr_checksum.zc_word[2]);
974 		DO64(drr_end.drr_checksum.zc_word[3]);
975 		break;
976 	}
977 #undef DO64
978 #undef DO32
979 }
980 
981 static int
982 restore_object(struct restorearg *ra, objset_t *os, struct drr_object *drro)
983 {
984 	int err;
985 	dmu_tx_t *tx;
986 
987 	err = dmu_object_info(os, drro->drr_object, NULL);
988 
989 	if (err != 0 && err != ENOENT)
990 		return (EINVAL);
991 
992 	if (drro->drr_type == DMU_OT_NONE ||
993 	    drro->drr_type >= DMU_OT_NUMTYPES ||
994 	    drro->drr_bonustype >= DMU_OT_NUMTYPES ||
995 	    drro->drr_checksum >= ZIO_CHECKSUM_FUNCTIONS ||
996 	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
997 	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
998 	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
999 	    drro->drr_blksz > SPA_MAXBLOCKSIZE ||
1000 	    drro->drr_bonuslen > DN_MAX_BONUSLEN) {
1001 		return (EINVAL);
1002 	}
1003 
1004 	tx = dmu_tx_create(os);
1005 
1006 	if (err == ENOENT) {
1007 		/* currently free, want to be allocated */
1008 		dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT);
1009 		dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, 1);
1010 		err = dmu_tx_assign(tx, TXG_WAIT);
1011 		if (err) {
1012 			dmu_tx_abort(tx);
1013 			return (err);
1014 		}
1015 		err = dmu_object_claim(os, drro->drr_object,
1016 		    drro->drr_type, drro->drr_blksz,
1017 		    drro->drr_bonustype, drro->drr_bonuslen, tx);
1018 	} else {
1019 		/* currently allocated, want to be allocated */
1020 		dmu_tx_hold_bonus(tx, drro->drr_object);
1021 		/*
1022 		 * We may change blocksize, so need to
1023 		 * hold_write
1024 		 */
1025 		dmu_tx_hold_write(tx, drro->drr_object, 0, 1);
1026 		err = dmu_tx_assign(tx, TXG_WAIT);
1027 		if (err) {
1028 			dmu_tx_abort(tx);
1029 			return (err);
1030 		}
1031 
1032 		err = dmu_object_reclaim(os, drro->drr_object,
1033 		    drro->drr_type, drro->drr_blksz,
1034 		    drro->drr_bonustype, drro->drr_bonuslen, tx);
1035 	}
1036 	if (err) {
1037 		dmu_tx_commit(tx);
1038 		return (EINVAL);
1039 	}
1040 
1041 	dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksum, tx);
1042 	dmu_object_set_compress(os, drro->drr_object, drro->drr_compress, tx);
1043 
1044 	if (drro->drr_bonuslen) {
1045 		dmu_buf_t *db;
1046 		void *data;
1047 		VERIFY(0 == dmu_bonus_hold(os, drro->drr_object, FTAG, &db));
1048 		dmu_buf_will_dirty(db, tx);
1049 
1050 		ASSERT3U(db->db_size, ==, drro->drr_bonuslen);
1051 		data = restore_read(ra, P2ROUNDUP(db->db_size, 8));
1052 		if (data == NULL) {
1053 			dmu_tx_commit(tx);
1054 			return (ra->err);
1055 		}
1056 		bcopy(data, db->db_data, db->db_size);
1057 		if (ra->byteswap) {
1058 			dmu_ot[drro->drr_bonustype].ot_byteswap(db->db_data,
1059 			    drro->drr_bonuslen);
1060 		}
1061 		dmu_buf_rele(db, FTAG);
1062 	}
1063 	dmu_tx_commit(tx);
1064 	return (0);
1065 }
1066 
1067 /* ARGSUSED */
1068 static int
1069 restore_freeobjects(struct restorearg *ra, objset_t *os,
1070     struct drr_freeobjects *drrfo)
1071 {
1072 	uint64_t obj;
1073 
1074 	if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
1075 		return (EINVAL);
1076 
1077 	for (obj = drrfo->drr_firstobj;
1078 	    obj < drrfo->drr_firstobj + drrfo->drr_numobjs; obj++) {
1079 		dmu_tx_t *tx;
1080 		int err;
1081 
1082 		if (dmu_object_info(os, obj, NULL) != 0)
1083 			continue;
1084 
1085 		tx = dmu_tx_create(os);
1086 		dmu_tx_hold_bonus(tx, obj);
1087 		err = dmu_tx_assign(tx, TXG_WAIT);
1088 		if (err) {
1089 			dmu_tx_abort(tx);
1090 			return (err);
1091 		}
1092 		err = dmu_object_free(os, obj, tx);
1093 		dmu_tx_commit(tx);
1094 		if (err && err != ENOENT)
1095 			return (EINVAL);
1096 	}
1097 	return (0);
1098 }
1099 
1100 static int
1101 restore_write(struct restorearg *ra, objset_t *os,
1102     struct drr_write *drrw)
1103 {
1104 	dmu_tx_t *tx;
1105 	void *data;
1106 	int err;
1107 
1108 	if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
1109 	    drrw->drr_type >= DMU_OT_NUMTYPES)
1110 		return (EINVAL);
1111 
1112 	data = restore_read(ra, drrw->drr_length);
1113 	if (data == NULL)
1114 		return (ra->err);
1115 
1116 	if (dmu_object_info(os, drrw->drr_object, NULL) != 0)
1117 		return (EINVAL);
1118 
1119 	tx = dmu_tx_create(os);
1120 
1121 	dmu_tx_hold_write(tx, drrw->drr_object,
1122 	    drrw->drr_offset, drrw->drr_length);
1123 	err = dmu_tx_assign(tx, TXG_WAIT);
1124 	if (err) {
1125 		dmu_tx_abort(tx);
1126 		return (err);
1127 	}
1128 	if (ra->byteswap)
1129 		dmu_ot[drrw->drr_type].ot_byteswap(data, drrw->drr_length);
1130 	dmu_write(os, drrw->drr_object,
1131 	    drrw->drr_offset, drrw->drr_length, data, tx);
1132 	dmu_tx_commit(tx);
1133 	return (0);
1134 }
1135 
1136 /* ARGSUSED */
1137 static int
1138 restore_free(struct restorearg *ra, objset_t *os,
1139     struct drr_free *drrf)
1140 {
1141 	dmu_tx_t *tx;
1142 	int err;
1143 
1144 	if (drrf->drr_length != -1ULL &&
1145 	    drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
1146 		return (EINVAL);
1147 
1148 	if (dmu_object_info(os, drrf->drr_object, NULL) != 0)
1149 		return (EINVAL);
1150 
1151 	tx = dmu_tx_create(os);
1152 
1153 	dmu_tx_hold_free(tx, drrf->drr_object,
1154 	    drrf->drr_offset, drrf->drr_length);
1155 	err = dmu_tx_assign(tx, TXG_WAIT);
1156 	if (err) {
1157 		dmu_tx_abort(tx);
1158 		return (err);
1159 	}
1160 	err = dmu_free_range(os, drrf->drr_object,
1161 	    drrf->drr_offset, drrf->drr_length, tx);
1162 	dmu_tx_commit(tx);
1163 	return (err);
1164 }
1165 
1166 int
1167 dmu_recvbackup(char *tosnap, struct drr_begin *drrb, uint64_t *sizep,
1168     vnode_t *vp, uint64_t voffset)
1169 {
1170 	struct restorearg ra;
1171 	dmu_replay_record_t *drr;
1172 	char *cp;
1173 	dsl_dir_t *dd = NULL;
1174 	objset_t *os = NULL;
1175 	zio_cksum_t pzc;
1176 
1177 	bzero(&ra, sizeof (ra));
1178 	ra.vp = vp;
1179 	ra.voff = voffset;
1180 	ra.bufsize = 1<<20;
1181 	ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP);
1182 
1183 	if (drrb->drr_magic == DMU_BACKUP_MAGIC) {
1184 		ra.byteswap = FALSE;
1185 	} else if (drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1186 		ra.byteswap = TRUE;
1187 	} else {
1188 		ra.err = EINVAL;
1189 		goto out;
1190 	}
1191 
1192 	/*
1193 	 * NB: this assumes that struct drr_begin will be the largest in
1194 	 * dmu_replay_record_t's drr_u, and thus we don't need to pad it
1195 	 * with zeros to make it the same length as we wrote out.
1196 	 */
1197 	((dmu_replay_record_t *)ra.buf)->drr_type = DRR_BEGIN;
1198 	((dmu_replay_record_t *)ra.buf)->drr_pad = 0;
1199 	((dmu_replay_record_t *)ra.buf)->drr_u.drr_begin = *drrb;
1200 	if (ra.byteswap) {
1201 		fletcher_4_incremental_byteswap(ra.buf,
1202 		    sizeof (dmu_replay_record_t), &ra.zc);
1203 	} else {
1204 		fletcher_4_incremental_native(ra.buf,
1205 		    sizeof (dmu_replay_record_t), &ra.zc);
1206 	}
1207 	(void) strcpy(drrb->drr_toname, tosnap); /* for the sync funcs */
1208 
1209 	if (ra.byteswap) {
1210 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
1211 		drrb->drr_version = BSWAP_64(drrb->drr_version);
1212 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
1213 		drrb->drr_type = BSWAP_32(drrb->drr_type);
1214 		drrb->drr_toguid = BSWAP_64(drrb->drr_toguid);
1215 		drrb->drr_fromguid = BSWAP_64(drrb->drr_fromguid);
1216 	}
1217 
1218 	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1219 
1220 	if (drrb->drr_version != DMU_BACKUP_VERSION ||
1221 	    drrb->drr_type >= DMU_OST_NUMTYPES ||
1222 	    strchr(drrb->drr_toname, '@') == NULL) {
1223 		ra.err = EINVAL;
1224 		goto out;
1225 	}
1226 
1227 	/*
1228 	 * Process the begin in syncing context.
1229 	 */
1230 	if (drrb->drr_fromguid) {
1231 		/* incremental backup */
1232 
1233 		cp = strchr(tosnap, '@');
1234 		*cp = '\0';
1235 		ra.err = dsl_dir_open(tosnap, FTAG, &dd, NULL);
1236 		*cp = '@';
1237 		if (ra.err)
1238 			goto out;
1239 
1240 		ra.err = dsl_dir_sync_task(dd, replay_incremental_sync,
1241 		    drrb, 1<<20);
1242 	} else {
1243 		/* full backup */
1244 		const char *tail;
1245 
1246 		cp = strchr(tosnap, '@');
1247 		*cp = '\0';
1248 		ra.err = dsl_dir_open(tosnap, FTAG, &dd, &tail);
1249 		*cp = '@';
1250 		if (ra.err)
1251 			goto out;
1252 		if (tail == NULL) {
1253 			ra.err = EEXIST;
1254 			goto out;
1255 		}
1256 
1257 		ra.err = dsl_dir_sync_task(dd, replay_full_sync,
1258 		    drrb, 1<<20);
1259 	}
1260 	if (ra.err)
1261 		goto out;
1262 
1263 	/*
1264 	 * Open the objset we are modifying.
1265 	 */
1266 
1267 	cp = strchr(tosnap, '@');
1268 	*cp = '\0';
1269 	ra.err = dmu_objset_open(tosnap, DMU_OST_ANY,
1270 	    DS_MODE_PRIMARY | DS_MODE_RESTORE, &os);
1271 	*cp = '@';
1272 	ASSERT3U(ra.err, ==, 0);
1273 
1274 	/*
1275 	 * Read records and process them.
1276 	 */
1277 	pzc = ra.zc;
1278 	while (ra.err == 0 &&
1279 	    NULL != (drr = restore_read(&ra, sizeof (*drr)))) {
1280 		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1281 			ra.err = EINTR;
1282 			goto out;
1283 		}
1284 
1285 		if (ra.byteswap)
1286 			backup_byteswap(drr);
1287 
1288 		switch (drr->drr_type) {
1289 		case DRR_OBJECT:
1290 		{
1291 			/*
1292 			 * We need to make a copy of the record header,
1293 			 * because restore_{object,write} may need to
1294 			 * restore_read(), which will invalidate drr.
1295 			 */
1296 			struct drr_object drro = drr->drr_u.drr_object;
1297 			ra.err = restore_object(&ra, os, &drro);
1298 			break;
1299 		}
1300 		case DRR_FREEOBJECTS:
1301 		{
1302 			struct drr_freeobjects drrfo =
1303 			    drr->drr_u.drr_freeobjects;
1304 			ra.err = restore_freeobjects(&ra, os, &drrfo);
1305 			break;
1306 		}
1307 		case DRR_WRITE:
1308 		{
1309 			struct drr_write drrw = drr->drr_u.drr_write;
1310 			ra.err = restore_write(&ra, os, &drrw);
1311 			break;
1312 		}
1313 		case DRR_FREE:
1314 		{
1315 			struct drr_free drrf = drr->drr_u.drr_free;
1316 			ra.err = restore_free(&ra, os, &drrf);
1317 			break;
1318 		}
1319 		case DRR_END:
1320 		{
1321 			struct drr_end drre = drr->drr_u.drr_end;
1322 			/*
1323 			 * We compare against the *previous* checksum
1324 			 * value, because the stored checksum is of
1325 			 * everything before the DRR_END record.
1326 			 */
1327 			if (drre.drr_checksum.zc_word[0] != 0 &&
1328 			    ((drre.drr_checksum.zc_word[0] - pzc.zc_word[0]) |
1329 			    (drre.drr_checksum.zc_word[1] - pzc.zc_word[1]) |
1330 			    (drre.drr_checksum.zc_word[2] - pzc.zc_word[2]) |
1331 			    (drre.drr_checksum.zc_word[3] - pzc.zc_word[3]))) {
1332 				ra.err = ECKSUM;
1333 				goto out;
1334 			}
1335 
1336 			/*
1337 			 * dd may be the parent of the dd we are
1338 			 * restoring into (eg. if it's a full backup).
1339 			 */
1340 			ra.err = dsl_dir_sync_task(dmu_objset_ds(os)->
1341 			    ds_dir, replay_end_sync, drrb, 1<<20);
1342 			goto out;
1343 		}
1344 		default:
1345 			ra.err = EINVAL;
1346 			goto out;
1347 		}
1348 		pzc = ra.zc;
1349 	}
1350 
1351 out:
1352 	if (os)
1353 		dmu_objset_close(os);
1354 
1355 	/*
1356 	 * Make sure we don't rollback/destroy unless we actually
1357 	 * processed the begin properly.  'os' will only be set if this
1358 	 * is the case.
1359 	 */
1360 	if (ra.err && os && dd && tosnap && strchr(tosnap, '@')) {
1361 		/*
1362 		 * rollback or destroy what we created, so we don't
1363 		 * leave it in the restoring state.
1364 		 */
1365 		txg_wait_synced(dd->dd_pool, 0);
1366 		if (drrb->drr_fromguid) {
1367 			/* incremental: rollback to most recent snapshot */
1368 			(void) dsl_dir_sync_task(dd,
1369 			    dsl_dataset_rollback_sync, NULL, 0);
1370 		} else {
1371 			/* full: destroy whole fs */
1372 			cp = strchr(tosnap, '@');
1373 			*cp = '\0';
1374 			cp = strchr(tosnap, '/');
1375 			if (cp) {
1376 				(void) dsl_dir_sync_task(dd,
1377 				    dsl_dir_destroy_sync, cp+1, 0);
1378 			}
1379 			cp = strchr(tosnap, '\0');
1380 			*cp = '@';
1381 		}
1382 
1383 	}
1384 
1385 	if (dd)
1386 		dsl_dir_close(dd, FTAG);
1387 	kmem_free(ra.buf, ra.bufsize);
1388 	if (sizep)
1389 		*sizep = ra.voff;
1390 	return (ra.err);
1391 }
1392 
1393 /*
1394  * Intent log support: sync the block at <os, object, offset> to disk.
1395  * N.B. and XXX: the caller is responsible for serializing dmu_sync()s
1396  * of the same block, and for making sure that the data isn't changing
1397  * while dmu_sync() is writing it.
1398  *
1399  * Return values:
1400  *
1401  *	EALREADY: this txg has already been synced, so there's nothing to to.
1402  *		The caller should not log the write.
1403  *
1404  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1405  *		The caller should not log the write.
1406  *
1407  *	EINPROGRESS: the block is in the process of being synced by the
1408  *		usual mechanism (spa_sync()), so we can't sync it here.
1409  *		The caller should txg_wait_synced() and not log the write.
1410  *
1411  *	EBUSY: another thread is trying to dmu_sync() the same dbuf.
1412  *		(This case cannot arise under the current locking rules.)
1413  *		The caller should txg_wait_synced() and not log the write.
1414  *
1415  *	ESTALE: the block was dirtied or freed while we were writing it,
1416  *		so the data is no longer valid.
1417  *		The caller should txg_wait_synced() and not log the write.
1418  *
1419  *	0: success.  Sets *bp to the blkptr just written, and sets
1420  *		*blkoff to the data's offset within that block.
1421  *		The caller should log this blkptr/blkoff in its lr_write_t.
1422  */
1423 int
1424 dmu_sync(objset_t *os, uint64_t object, uint64_t offset, uint64_t *blkoff,
1425     blkptr_t *bp, uint64_t txg)
1426 {
1427 	dsl_pool_t *dp = os->os->os_dsl_dataset->ds_dir->dd_pool;
1428 	tx_state_t *tx = &dp->dp_tx;
1429 	dmu_buf_impl_t *db;
1430 	blkptr_t *blk;
1431 	int err;
1432 	zbookmark_t zb;
1433 
1434 	ASSERT(RW_LOCK_HELD(&tx->tx_suspend));
1435 	ASSERT(BP_IS_HOLE(bp));
1436 	ASSERT(txg != 0);
1437 
1438 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
1439 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
1440 
1441 	/*
1442 	 * XXX why is this routine using dmu_buf_*() and casting between
1443 	 * dmu_buf_impl_t and dmu_buf_t?
1444 	 */
1445 
1446 	/*
1447 	 * If this txg already synced, there's nothing to do.
1448 	 */
1449 	if (txg <= tx->tx_synced_txg) {
1450 		/*
1451 		 * If we're running ziltest, we need the blkptr regardless.
1452 		 */
1453 		if (txg > spa_freeze_txg(dp->dp_spa)) {
1454 			err = dmu_buf_hold(os, object, offset,
1455 			    FTAG, (dmu_buf_t **)&db);
1456 			if (err)
1457 				return (err);
1458 			/* if db_blkptr == NULL, this was an empty write */
1459 			if (db->db_blkptr)
1460 				*bp = *db->db_blkptr; /* structure assignment */
1461 			else
1462 				bzero(bp, sizeof (blkptr_t));
1463 			*blkoff = offset - db->db.db_offset;
1464 			ASSERT3U(*blkoff, <, db->db.db_size);
1465 			dmu_buf_rele((dmu_buf_t *)db, FTAG);
1466 			return (0);
1467 		}
1468 		return (EALREADY);
1469 	}
1470 
1471 	/*
1472 	 * If this txg is in the middle of syncing, just wait for it.
1473 	 */
1474 	if (txg == tx->tx_syncing_txg) {
1475 		ASSERT(txg != tx->tx_open_txg);
1476 		return (EINPROGRESS);
1477 	}
1478 
1479 	err = dmu_buf_hold(os, object, offset, FTAG, (dmu_buf_t **)&db);
1480 	if (err)
1481 		return (err);
1482 
1483 	mutex_enter(&db->db_mtx);
1484 
1485 	/*
1486 	 * If this dbuf isn't dirty, must have been free_range'd.
1487 	 * There's no need to log writes to freed blocks, so we're done.
1488 	 */
1489 	if (!list_link_active(&db->db_dirty_node[txg&TXG_MASK])) {
1490 		mutex_exit(&db->db_mtx);
1491 		dmu_buf_rele((dmu_buf_t *)db, FTAG);
1492 		return (ENOENT);
1493 	}
1494 
1495 	blk = db->db_d.db_overridden_by[txg&TXG_MASK];
1496 
1497 	/*
1498 	 * If we already did a dmu_sync() of this dbuf in this txg,
1499 	 * free the old block before writing the new one.
1500 	 */
1501 	if (blk != NULL) {
1502 		ASSERT(blk != IN_DMU_SYNC);
1503 		if (blk == IN_DMU_SYNC) {
1504 			mutex_exit(&db->db_mtx);
1505 			dmu_buf_rele((dmu_buf_t *)db, FTAG);
1506 			return (EBUSY);
1507 		}
1508 		arc_release(db->db_d.db_data_old[txg&TXG_MASK], db);
1509 		if (!BP_IS_HOLE(blk)) {
1510 			(void) arc_free(NULL, os->os->os_spa, txg, blk,
1511 			    NULL, NULL, ARC_WAIT);
1512 		}
1513 		kmem_free(blk, sizeof (blkptr_t));
1514 	}
1515 
1516 	db->db_d.db_overridden_by[txg&TXG_MASK] = IN_DMU_SYNC;
1517 	mutex_exit(&db->db_mtx);
1518 
1519 	blk = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
1520 	blk->blk_birth = 0; /* mark as invalid */
1521 
1522 	zb.zb_objset = os->os->os_dsl_dataset->ds_object;
1523 	zb.zb_object = db->db.db_object;
1524 	zb.zb_level = db->db_level;
1525 	zb.zb_blkid = db->db_blkid;
1526 	err = arc_write(NULL, os->os->os_spa,
1527 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os->os_checksum),
1528 	    zio_compress_select(db->db_dnode->dn_compress, os->os->os_compress),
1529 	    txg, blk, db->db_d.db_data_old[txg&TXG_MASK], NULL, NULL,
1530 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, ARC_WAIT, &zb);
1531 	ASSERT(err == 0);
1532 
1533 	if (!BP_IS_HOLE(blk)) {
1534 		blk->blk_fill = 1;
1535 		BP_SET_TYPE(blk, db->db_dnode->dn_type);
1536 		BP_SET_LEVEL(blk, 0);
1537 	}
1538 
1539 	/* copy the block pointer back to caller */
1540 	*bp = *blk; /* structure assignment */
1541 	*blkoff = offset - db->db.db_offset;
1542 	ASSERT3U(*blkoff, <, db->db.db_size);
1543 
1544 	mutex_enter(&db->db_mtx);
1545 	if (db->db_d.db_overridden_by[txg&TXG_MASK] != IN_DMU_SYNC) {
1546 		/* we were dirtied/freed during the sync */
1547 		ASSERT3P(db->db_d.db_overridden_by[txg&TXG_MASK], ==, NULL);
1548 		arc_release(db->db_d.db_data_old[txg&TXG_MASK], db);
1549 		mutex_exit(&db->db_mtx);
1550 		dmu_buf_rele((dmu_buf_t *)db, FTAG);
1551 		/* Note that this block does not free on disk until txg syncs */
1552 
1553 		/*
1554 		 * XXX can we use ARC_NOWAIT here?
1555 		 * XXX should we be ignoring the return code?
1556 		 */
1557 		if (!BP_IS_HOLE(blk)) {
1558 			(void) arc_free(NULL, os->os->os_spa, txg, blk,
1559 			    NULL, NULL, ARC_WAIT);
1560 		}
1561 		kmem_free(blk, sizeof (blkptr_t));
1562 		return (ESTALE);
1563 	}
1564 
1565 	db->db_d.db_overridden_by[txg&TXG_MASK] = blk;
1566 	mutex_exit(&db->db_mtx);
1567 	dmu_buf_rele((dmu_buf_t *)db, FTAG);
1568 	ASSERT3U(txg, >, tx->tx_syncing_txg);
1569 	return (0);
1570 }
1571 
1572 uint64_t
1573 dmu_object_max_nonzero_offset(objset_t *os, uint64_t object)
1574 {
1575 	dnode_t *dn;
1576 
1577 	/* XXX assumes dnode_hold will not get an i/o error */
1578 	(void) dnode_hold(os->os, object, FTAG, &dn);
1579 	uint64_t rv = dnode_max_nonzero_offset(dn);
1580 	dnode_rele(dn, FTAG);
1581 	return (rv);
1582 }
1583 
1584 int
1585 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1586 	dmu_tx_t *tx)
1587 {
1588 	dnode_t *dn;
1589 	int err;
1590 
1591 	err = dnode_hold(os->os, object, FTAG, &dn);
1592 	if (err)
1593 		return (err);
1594 	err = dnode_set_blksz(dn, size, ibs, tx);
1595 	dnode_rele(dn, FTAG);
1596 	return (err);
1597 }
1598 
1599 void
1600 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1601 	dmu_tx_t *tx)
1602 {
1603 	dnode_t *dn;
1604 
1605 	/* XXX assumes dnode_hold will not get an i/o error */
1606 	(void) dnode_hold(os->os, object, FTAG, &dn);
1607 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1608 	dn->dn_checksum = checksum;
1609 	dnode_setdirty(dn, tx);
1610 	dnode_rele(dn, FTAG);
1611 }
1612 
1613 void
1614 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1615 	dmu_tx_t *tx)
1616 {
1617 	dnode_t *dn;
1618 
1619 	/* XXX assumes dnode_hold will not get an i/o error */
1620 	(void) dnode_hold(os->os, object, FTAG, &dn);
1621 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1622 	dn->dn_compress = compress;
1623 	dnode_setdirty(dn, tx);
1624 	dnode_rele(dn, FTAG);
1625 }
1626 
1627 int
1628 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1629 {
1630 	dnode_t *dn;
1631 	int i, err;
1632 
1633 	err = dnode_hold(os->os, object, FTAG, &dn);
1634 	if (err)
1635 		return (err);
1636 	/*
1637 	 * Sync any current changes before
1638 	 * we go trundling through the block pointers.
1639 	 */
1640 	for (i = 0; i < TXG_SIZE; i++) {
1641 		if (dn->dn_dirtyblksz[i])
1642 			break;
1643 	}
1644 	if (i != TXG_SIZE) {
1645 		dnode_rele(dn, FTAG);
1646 		txg_wait_synced(dmu_objset_pool(os), 0);
1647 		err = dnode_hold(os->os, object, FTAG, &dn);
1648 		if (err)
1649 			return (err);
1650 	}
1651 
1652 	err = dnode_next_offset(dn, hole, off, 1, 1);
1653 	dnode_rele(dn, FTAG);
1654 
1655 	return (err);
1656 }
1657 
1658 void
1659 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1660 {
1661 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1662 	mutex_enter(&dn->dn_mtx);
1663 
1664 	doi->doi_data_block_size = dn->dn_datablksz;
1665 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1666 	    1ULL << dn->dn_indblkshift : 0;
1667 	doi->doi_indirection = dn->dn_nlevels;
1668 	doi->doi_checksum = dn->dn_checksum;
1669 	doi->doi_compress = dn->dn_compress;
1670 	doi->doi_physical_blks = dn->dn_phys->dn_secphys;
1671 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
1672 	doi->doi_type = dn->dn_type;
1673 	doi->doi_bonus_size = dn->dn_bonuslen;
1674 	doi->doi_bonus_type = dn->dn_bonustype;
1675 
1676 	mutex_exit(&dn->dn_mtx);
1677 	rw_exit(&dn->dn_struct_rwlock);
1678 }
1679 
1680 /*
1681  * Get information on a DMU object.
1682  * If doi is NULL, just indicates whether the object exists.
1683  */
1684 int
1685 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1686 {
1687 	dnode_t *dn;
1688 	int err = dnode_hold(os->os, object, FTAG, &dn);
1689 
1690 	if (err)
1691 		return (err);
1692 
1693 	if (doi != NULL)
1694 		dmu_object_info_from_dnode(dn, doi);
1695 
1696 	dnode_rele(dn, FTAG);
1697 	return (0);
1698 }
1699 
1700 /*
1701  * As above, but faster; can be used when you have a held dbuf in hand.
1702  */
1703 void
1704 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1705 {
1706 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1707 }
1708 
1709 /*
1710  * Faster still when you only care about the size.
1711  * This is specifically optimized for zfs_getattr().
1712  */
1713 void
1714 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1715 {
1716 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1717 
1718 	*blksize = dn->dn_datablksz;
1719 	*nblk512 = dn->dn_phys->dn_secphys + 1;	/* add 1 for dnode space */
1720 }
1721 
1722 /*
1723  * Given a bookmark, return the name of the dataset, object, and range in
1724  * human-readable format.
1725  */
1726 int
1727 spa_bookmark_name(spa_t *spa, zbookmark_t *zb, char *dsname, size_t dslen,
1728     char *objname, size_t objlen, char *range, size_t rangelen)
1729 {
1730 	dsl_pool_t *dp;
1731 	dsl_dataset_t *ds = NULL;
1732 	objset_t *os = NULL;
1733 	dnode_t *dn = NULL;
1734 	int err, shift;
1735 
1736 	if (dslen < MAXNAMELEN || objlen < 32 || rangelen < 64)
1737 		return (ENOSPC);
1738 
1739 	dp = spa_get_dsl(spa);
1740 	if (zb->zb_objset != 0) {
1741 		rw_enter(&dp->dp_config_rwlock, RW_READER);
1742 		err = dsl_dataset_open_obj(dp, zb->zb_objset,
1743 		    NULL, DS_MODE_NONE, FTAG, &ds);
1744 		if (err) {
1745 			rw_exit(&dp->dp_config_rwlock);
1746 			return (err);
1747 		}
1748 		dsl_dataset_name(ds, dsname);
1749 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1750 		rw_exit(&dp->dp_config_rwlock);
1751 
1752 		err = dmu_objset_open(dsname, DMU_OST_ANY, DS_MODE_NONE, &os);
1753 		if (err)
1754 			goto out;
1755 
1756 	} else {
1757 		dsl_dataset_name(NULL, dsname);
1758 		os = dp->dp_meta_objset;
1759 	}
1760 
1761 
1762 	if (zb->zb_object == DMU_META_DNODE_OBJECT) {
1763 		(void) strncpy(objname, "mdn", objlen);
1764 	} else {
1765 		(void) snprintf(objname, objlen, "%lld",
1766 		    (longlong_t)zb->zb_object);
1767 	}
1768 
1769 	err = dnode_hold(os->os, zb->zb_object, FTAG, &dn);
1770 	if (err)
1771 		goto out;
1772 
1773 	shift = (dn->dn_datablkshift?dn->dn_datablkshift:SPA_MAXBLOCKSHIFT) +
1774 	    zb->zb_level * (dn->dn_indblkshift - SPA_BLKPTRSHIFT);
1775 	(void) snprintf(range, rangelen, "%llu-%llu",
1776 	    (u_longlong_t)(zb->zb_blkid << shift),
1777 	    (u_longlong_t)((zb->zb_blkid+1) << shift));
1778 
1779 out:
1780 	if (dn)
1781 		dnode_rele(dn, FTAG);
1782 	if (os && os != dp->dp_meta_objset)
1783 		dmu_objset_close(os);
1784 	return (err);
1785 }
1786 
1787 void
1788 byteswap_uint64_array(void *vbuf, size_t size)
1789 {
1790 	uint64_t *buf = vbuf;
1791 	size_t count = size >> 3;
1792 	int i;
1793 
1794 	ASSERT((size & 7) == 0);
1795 
1796 	for (i = 0; i < count; i++)
1797 		buf[i] = BSWAP_64(buf[i]);
1798 }
1799 
1800 void
1801 byteswap_uint32_array(void *vbuf, size_t size)
1802 {
1803 	uint32_t *buf = vbuf;
1804 	size_t count = size >> 2;
1805 	int i;
1806 
1807 	ASSERT((size & 3) == 0);
1808 
1809 	for (i = 0; i < count; i++)
1810 		buf[i] = BSWAP_32(buf[i]);
1811 }
1812 
1813 void
1814 byteswap_uint16_array(void *vbuf, size_t size)
1815 {
1816 	uint16_t *buf = vbuf;
1817 	size_t count = size >> 1;
1818 	int i;
1819 
1820 	ASSERT((size & 1) == 0);
1821 
1822 	for (i = 0; i < count; i++)
1823 		buf[i] = BSWAP_16(buf[i]);
1824 }
1825 
1826 /* ARGSUSED */
1827 void
1828 byteswap_uint8_array(void *vbuf, size_t size)
1829 {
1830 }
1831 
1832 void
1833 dmu_init(void)
1834 {
1835 	dbuf_init();
1836 	dnode_init();
1837 	arc_init();
1838 }
1839 
1840 void
1841 dmu_fini(void)
1842 {
1843 	arc_fini();
1844 	dnode_fini();
1845 	dbuf_fini();
1846 }
1847