xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu.c (revision e19302335c33c8c6e0b0b5e426fc1f6352c84b5d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dmu_zfetch.h>
40 #include <sys/zfs_ioctl.h>
41 #include <sys/zap.h>
42 #include <sys/zio_checksum.h>
43 
44 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
45 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
46 	{	zap_byteswap,		TRUE,	"object directory"	},
47 	{	byteswap_uint64_array,	TRUE,	"object array"		},
48 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
49 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
50 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
51 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
52 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
53 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
54 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
55 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
56 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
57 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
58 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
59 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
60 	{	zap_byteswap,		TRUE,	"DSL props"		},
61 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
62 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
63 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
64 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
65 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
66 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
67 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
68 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
69 	{	zap_byteswap,		TRUE,	"zvol prop"		},
70 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
71 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
72 	{	zap_byteswap,		TRUE,	"other ZAP"		},
73 	{	zap_byteswap,		TRUE,	"persistent error log"	},
74 };
75 
76 int
77 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
78     void *tag, dmu_buf_t **dbp)
79 {
80 	dnode_t *dn;
81 	uint64_t blkid;
82 	dmu_buf_impl_t *db;
83 	int err;
84 
85 	/* dataset_verify(dd); */
86 
87 	err = dnode_hold(os->os, object, FTAG, &dn);
88 	if (err)
89 		return (err);
90 	blkid = dbuf_whichblock(dn, offset);
91 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
92 	db = dbuf_hold(dn, blkid, tag);
93 	rw_exit(&dn->dn_struct_rwlock);
94 	if (db == NULL) {
95 		err = EIO;
96 	} else {
97 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
98 		if (err) {
99 			dbuf_rele(db, tag);
100 			db = NULL;
101 		}
102 	}
103 
104 	dnode_rele(dn, FTAG);
105 	*dbp = &db->db;
106 	return (err);
107 }
108 
109 int
110 dmu_bonus_max(void)
111 {
112 	return (DN_MAX_BONUSLEN);
113 }
114 
115 /*
116  * returns ENOENT, EIO, or 0.
117  */
118 int
119 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
120 {
121 	dnode_t *dn;
122 	int err, count;
123 	dmu_buf_impl_t *db;
124 
125 	err = dnode_hold(os->os, object, FTAG, &dn);
126 	if (err)
127 		return (err);
128 
129 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
130 	if (dn->dn_bonus == NULL) {
131 		rw_exit(&dn->dn_struct_rwlock);
132 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
133 		if (dn->dn_bonus == NULL)
134 			dn->dn_bonus = dbuf_create_bonus(dn);
135 	}
136 	db = dn->dn_bonus;
137 	rw_exit(&dn->dn_struct_rwlock);
138 	mutex_enter(&db->db_mtx);
139 	count = refcount_add(&db->db_holds, tag);
140 	mutex_exit(&db->db_mtx);
141 	if (count == 1)
142 		dnode_add_ref(dn, db);
143 	dnode_rele(dn, FTAG);
144 
145 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
146 
147 	*dbp = &db->db;
148 	return (0);
149 }
150 
151 int
152 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
153     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
154 {
155 	dnode_t *dn;
156 	dmu_buf_t **dbp;
157 	uint64_t blkid, nblks, i;
158 	uint32_t flags;
159 	int err;
160 	zio_t *zio;
161 
162 	ASSERT(length <= DMU_MAX_ACCESS);
163 
164 	if (length == 0) {
165 		if (numbufsp)
166 			*numbufsp = 0;
167 		*dbpp = NULL;
168 		return (0);
169 	}
170 
171 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
172 	if (length > zfetch_array_rd_sz)
173 		flags |= DB_RF_NOPREFETCH;
174 
175 	err = dnode_hold(os->os, object, FTAG, &dn);
176 	if (err)
177 		return (err);
178 
179 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
180 	if (dn->dn_datablkshift) {
181 		int blkshift = dn->dn_datablkshift;
182 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
183 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
184 	} else {
185 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
186 		nblks = 1;
187 	}
188 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
189 
190 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
191 	blkid = dbuf_whichblock(dn, offset);
192 	for (i = 0; i < nblks; i++) {
193 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
194 		if (db == NULL) {
195 			rw_exit(&dn->dn_struct_rwlock);
196 			dmu_buf_rele_array(dbp, nblks, tag);
197 			dnode_rele(dn, FTAG);
198 			zio_nowait(zio);
199 			return (EIO);
200 		}
201 		/* initiate async i/o */
202 		if (read && db->db_state == DB_UNCACHED) {
203 			rw_exit(&dn->dn_struct_rwlock);
204 			(void) dbuf_read(db, zio, flags);
205 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
206 		}
207 		dbp[i] = &db->db;
208 	}
209 	rw_exit(&dn->dn_struct_rwlock);
210 	dnode_rele(dn, FTAG);
211 
212 	/* wait for async i/o */
213 	err = zio_wait(zio);
214 	if (err) {
215 		dmu_buf_rele_array(dbp, nblks, tag);
216 		return (err);
217 	}
218 
219 	/* wait for other io to complete */
220 	if (read) {
221 		for (i = 0; i < nblks; i++) {
222 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
223 			mutex_enter(&db->db_mtx);
224 			while (db->db_state == DB_READ ||
225 			    db->db_state == DB_FILL)
226 				cv_wait(&db->db_changed, &db->db_mtx);
227 			if (db->db_state == DB_UNCACHED)
228 				err = EIO;
229 			mutex_exit(&db->db_mtx);
230 			if (err) {
231 				dmu_buf_rele_array(dbp, nblks, tag);
232 				return (err);
233 			}
234 		}
235 	}
236 
237 	*numbufsp = nblks;
238 	*dbpp = dbp;
239 	return (0);
240 }
241 
242 void
243 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
244 {
245 	int i;
246 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
247 
248 	if (numbufs == 0)
249 		return;
250 
251 	for (i = 0; i < numbufs; i++) {
252 		if (dbp[i])
253 			dbuf_rele(dbp[i], tag);
254 	}
255 
256 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
257 }
258 
259 void
260 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
261 {
262 	dnode_t *dn;
263 	uint64_t blkid;
264 	int nblks, i, err;
265 
266 	if (len == 0) {  /* they're interested in the bonus buffer */
267 		dn = os->os->os_meta_dnode;
268 
269 		if (object == 0 || object >= DN_MAX_OBJECT)
270 			return;
271 
272 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
273 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
274 		dbuf_prefetch(dn, blkid);
275 		rw_exit(&dn->dn_struct_rwlock);
276 		return;
277 	}
278 
279 	/*
280 	 * XXX - Note, if the dnode for the requested object is not
281 	 * already cached, we will do a *synchronous* read in the
282 	 * dnode_hold() call.  The same is true for any indirects.
283 	 */
284 	err = dnode_hold(os->os, object, FTAG, &dn);
285 	if (err != 0)
286 		return;
287 
288 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
289 	if (dn->dn_datablkshift) {
290 		int blkshift = dn->dn_datablkshift;
291 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
292 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
293 	} else {
294 		nblks = (offset < dn->dn_datablksz);
295 	}
296 
297 	if (nblks != 0) {
298 		blkid = dbuf_whichblock(dn, offset);
299 		for (i = 0; i < nblks; i++)
300 			dbuf_prefetch(dn, blkid+i);
301 	}
302 
303 	rw_exit(&dn->dn_struct_rwlock);
304 
305 	dnode_rele(dn, FTAG);
306 }
307 
308 int
309 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
310     uint64_t size, dmu_tx_t *tx)
311 {
312 	dnode_t *dn;
313 	int err = dnode_hold(os->os, object, FTAG, &dn);
314 	if (err)
315 		return (err);
316 	ASSERT(offset < UINT64_MAX);
317 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
318 	dnode_free_range(dn, offset, size, tx);
319 	dnode_rele(dn, FTAG);
320 	return (0);
321 }
322 
323 int
324 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
325     void *buf)
326 {
327 	dnode_t *dn;
328 	dmu_buf_t **dbp;
329 	int numbufs, i, err;
330 
331 	/*
332 	 * Deal with odd block sizes, where there can't be data past the
333 	 * first block.
334 	 */
335 	err = dnode_hold(os->os, object, FTAG, &dn);
336 	if (err)
337 		return (err);
338 	if (dn->dn_datablkshift == 0) {
339 		int newsz = offset > dn->dn_datablksz ? 0 :
340 		    MIN(size, dn->dn_datablksz - offset);
341 		bzero((char *)buf + newsz, size - newsz);
342 		size = newsz;
343 	}
344 	dnode_rele(dn, FTAG);
345 
346 	while (size > 0) {
347 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
348 		int err;
349 
350 		/*
351 		 * NB: we could do this block-at-a-time, but it's nice
352 		 * to be reading in parallel.
353 		 */
354 		err = dmu_buf_hold_array(os, object, offset, mylen,
355 		    TRUE, FTAG, &numbufs, &dbp);
356 		if (err)
357 			return (err);
358 
359 		for (i = 0; i < numbufs; i++) {
360 			int tocpy;
361 			int bufoff;
362 			dmu_buf_t *db = dbp[i];
363 
364 			ASSERT(size > 0);
365 
366 			bufoff = offset - db->db_offset;
367 			tocpy = (int)MIN(db->db_size - bufoff, size);
368 
369 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
370 
371 			offset += tocpy;
372 			size -= tocpy;
373 			buf = (char *)buf + tocpy;
374 		}
375 		dmu_buf_rele_array(dbp, numbufs, FTAG);
376 	}
377 	return (0);
378 }
379 
380 void
381 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
382     const void *buf, dmu_tx_t *tx)
383 {
384 	dmu_buf_t **dbp;
385 	int numbufs, i;
386 
387 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
388 	    FALSE, FTAG, &numbufs, &dbp));
389 
390 	for (i = 0; i < numbufs; i++) {
391 		int tocpy;
392 		int bufoff;
393 		dmu_buf_t *db = dbp[i];
394 
395 		ASSERT(size > 0);
396 
397 		bufoff = offset - db->db_offset;
398 		tocpy = (int)MIN(db->db_size - bufoff, size);
399 
400 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
401 
402 		if (tocpy == db->db_size)
403 			dmu_buf_will_fill(db, tx);
404 		else
405 			dmu_buf_will_dirty(db, tx);
406 
407 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
408 
409 		if (tocpy == db->db_size)
410 			dmu_buf_fill_done(db, tx);
411 
412 		offset += tocpy;
413 		size -= tocpy;
414 		buf = (char *)buf + tocpy;
415 	}
416 	dmu_buf_rele_array(dbp, numbufs, FTAG);
417 }
418 
419 #ifdef _KERNEL
420 int
421 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
422     uio_t *uio, dmu_tx_t *tx)
423 {
424 	dmu_buf_t **dbp;
425 	int numbufs, i;
426 	int err = 0;
427 
428 	err = dmu_buf_hold_array(os, object, offset, size,
429 	    FALSE, FTAG, &numbufs, &dbp);
430 	if (err)
431 		return (err);
432 
433 	for (i = 0; i < numbufs; i++) {
434 		int tocpy;
435 		int bufoff;
436 		dmu_buf_t *db = dbp[i];
437 
438 		ASSERT(size > 0);
439 
440 		bufoff = offset - db->db_offset;
441 		tocpy = (int)MIN(db->db_size - bufoff, size);
442 
443 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
444 
445 		if (tocpy == db->db_size)
446 			dmu_buf_will_fill(db, tx);
447 		else
448 			dmu_buf_will_dirty(db, tx);
449 
450 		/*
451 		 * XXX uiomove could block forever (eg. nfs-backed
452 		 * pages).  There needs to be a uiolockdown() function
453 		 * to lock the pages in memory, so that uiomove won't
454 		 * block.
455 		 */
456 		err = uiomove((char *)db->db_data + bufoff, tocpy,
457 		    UIO_WRITE, uio);
458 
459 		if (tocpy == db->db_size)
460 			dmu_buf_fill_done(db, tx);
461 
462 		if (err)
463 			break;
464 
465 		offset += tocpy;
466 		size -= tocpy;
467 	}
468 	dmu_buf_rele_array(dbp, numbufs, FTAG);
469 	return (err);
470 }
471 #endif
472 
473 struct backuparg {
474 	dmu_replay_record_t *drr;
475 	vnode_t *vp;
476 	objset_t *os;
477 	zio_cksum_t zc;
478 	int err;
479 };
480 
481 static int
482 dump_bytes(struct backuparg *ba, void *buf, int len)
483 {
484 	ssize_t resid; /* have to get resid to get detailed errno */
485 	ASSERT3U(len % 8, ==, 0);
486 
487 	fletcher_4_incremental_native(buf, len, &ba->zc);
488 	ba->err = vn_rdwr(UIO_WRITE, ba->vp,
489 	    (caddr_t)buf, len,
490 	    0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
491 	return (ba->err);
492 }
493 
494 static int
495 dump_free(struct backuparg *ba, uint64_t object, uint64_t offset,
496     uint64_t length)
497 {
498 	/* write a FREE record */
499 	bzero(ba->drr, sizeof (dmu_replay_record_t));
500 	ba->drr->drr_type = DRR_FREE;
501 	ba->drr->drr_u.drr_free.drr_object = object;
502 	ba->drr->drr_u.drr_free.drr_offset = offset;
503 	ba->drr->drr_u.drr_free.drr_length = length;
504 
505 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
506 		return (EINTR);
507 	return (0);
508 }
509 
510 static int
511 dump_data(struct backuparg *ba, dmu_object_type_t type,
512     uint64_t object, uint64_t offset, int blksz, void *data)
513 {
514 	/* write a DATA record */
515 	bzero(ba->drr, sizeof (dmu_replay_record_t));
516 	ba->drr->drr_type = DRR_WRITE;
517 	ba->drr->drr_u.drr_write.drr_object = object;
518 	ba->drr->drr_u.drr_write.drr_type = type;
519 	ba->drr->drr_u.drr_write.drr_offset = offset;
520 	ba->drr->drr_u.drr_write.drr_length = blksz;
521 
522 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
523 		return (EINTR);
524 	if (dump_bytes(ba, data, blksz))
525 		return (EINTR);
526 	return (0);
527 }
528 
529 static int
530 dump_freeobjects(struct backuparg *ba, uint64_t firstobj, uint64_t numobjs)
531 {
532 	/* write a FREEOBJECTS record */
533 	bzero(ba->drr, sizeof (dmu_replay_record_t));
534 	ba->drr->drr_type = DRR_FREEOBJECTS;
535 	ba->drr->drr_u.drr_freeobjects.drr_firstobj = firstobj;
536 	ba->drr->drr_u.drr_freeobjects.drr_numobjs = numobjs;
537 
538 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
539 		return (EINTR);
540 	return (0);
541 }
542 
543 static int
544 dump_dnode(struct backuparg *ba, uint64_t object, dnode_phys_t *dnp)
545 {
546 	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
547 		return (dump_freeobjects(ba, object, 1));
548 
549 	/* write an OBJECT record */
550 	bzero(ba->drr, sizeof (dmu_replay_record_t));
551 	ba->drr->drr_type = DRR_OBJECT;
552 	ba->drr->drr_u.drr_object.drr_object = object;
553 	ba->drr->drr_u.drr_object.drr_type = dnp->dn_type;
554 	ba->drr->drr_u.drr_object.drr_bonustype = dnp->dn_bonustype;
555 	ba->drr->drr_u.drr_object.drr_blksz =
556 	    dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
557 	ba->drr->drr_u.drr_object.drr_bonuslen = dnp->dn_bonuslen;
558 	ba->drr->drr_u.drr_object.drr_checksum = dnp->dn_checksum;
559 	ba->drr->drr_u.drr_object.drr_compress = dnp->dn_compress;
560 
561 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
562 		return (EINTR);
563 
564 	if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)))
565 		return (EINTR);
566 
567 	/* free anything past the end of the file */
568 	if (dump_free(ba, object, (dnp->dn_maxblkid + 1) *
569 	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL))
570 		return (EINTR);
571 	if (ba->err)
572 		return (EINTR);
573 	return (0);
574 }
575 
576 #define	BP_SPAN(dnp, level) \
577 	(((uint64_t)dnp->dn_datablkszsec) << (SPA_MINBLOCKSHIFT + \
578 	(level) * (dnp->dn_indblkshift - SPA_BLKPTRSHIFT)))
579 
580 static int
581 backup_cb(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
582 {
583 	struct backuparg *ba = arg;
584 	uint64_t object = bc->bc_bookmark.zb_object;
585 	int level = bc->bc_bookmark.zb_level;
586 	uint64_t blkid = bc->bc_bookmark.zb_blkid;
587 	blkptr_t *bp = bc->bc_blkptr.blk_birth ? &bc->bc_blkptr : NULL;
588 	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
589 	void *data = bc->bc_data;
590 	int err = 0;
591 
592 	if (issig(JUSTLOOKING) && issig(FORREAL))
593 		return (EINTR);
594 
595 	ASSERT(data || bp == NULL);
596 
597 	if (bp == NULL && object == 0) {
598 		uint64_t span = BP_SPAN(bc->bc_dnode, level);
599 		uint64_t dnobj = (blkid * span) >> DNODE_SHIFT;
600 		err = dump_freeobjects(ba, dnobj, span >> DNODE_SHIFT);
601 	} else if (bp == NULL) {
602 		uint64_t span = BP_SPAN(bc->bc_dnode, level);
603 		err = dump_free(ba, object, blkid * span, span);
604 	} else if (data && level == 0 && type == DMU_OT_DNODE) {
605 		dnode_phys_t *blk = data;
606 		int i;
607 		int blksz = BP_GET_LSIZE(bp);
608 
609 		for (i = 0; i < blksz >> DNODE_SHIFT; i++) {
610 			uint64_t dnobj =
611 			    (blkid << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)) + i;
612 			err = dump_dnode(ba, dnobj, blk+i);
613 			if (err)
614 				break;
615 		}
616 	} else if (level == 0 &&
617 	    type != DMU_OT_DNODE && type != DMU_OT_OBJSET) {
618 		int blksz = BP_GET_LSIZE(bp);
619 		if (data == NULL) {
620 			arc_buf_t *abuf;
621 			zbookmark_t zb;
622 
623 			zb.zb_objset = ba->os->os->os_dsl_dataset->ds_object;
624 			zb.zb_object = object;
625 			zb.zb_level = level;
626 			zb.zb_blkid = blkid;
627 			(void) arc_read(NULL, spa, bp,
628 			    dmu_ot[type].ot_byteswap, arc_getbuf_func, &abuf,
629 			    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_MUSTSUCCEED,
630 			    ARC_WAIT, &zb);
631 
632 			if (abuf) {
633 				err = dump_data(ba, type, object, blkid * blksz,
634 				    blksz, abuf->b_data);
635 				(void) arc_buf_remove_ref(abuf, &abuf);
636 			}
637 		} else {
638 			err = dump_data(ba, type, object, blkid * blksz,
639 			    blksz, data);
640 		}
641 	}
642 
643 	ASSERT(err == 0 || err == EINTR);
644 	return (err);
645 }
646 
647 int
648 dmu_sendbackup(objset_t *tosnap, objset_t *fromsnap, vnode_t *vp)
649 {
650 	dsl_dataset_t *ds = tosnap->os->os_dsl_dataset;
651 	dsl_dataset_t *fromds = fromsnap ? fromsnap->os->os_dsl_dataset : NULL;
652 	dmu_replay_record_t *drr;
653 	struct backuparg ba;
654 	int err;
655 
656 	/* tosnap must be a snapshot */
657 	if (ds->ds_phys->ds_next_snap_obj == 0)
658 		return (EINVAL);
659 
660 	/* fromsnap must be an earlier snapshot from the same fs as tosnap */
661 	if (fromds && (ds->ds_dir != fromds->ds_dir ||
662 	    fromds->ds_phys->ds_creation_txg >=
663 	    ds->ds_phys->ds_creation_txg))
664 		return (EXDEV);
665 
666 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
667 	drr->drr_type = DRR_BEGIN;
668 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
669 	drr->drr_u.drr_begin.drr_version = DMU_BACKUP_VERSION;
670 	drr->drr_u.drr_begin.drr_creation_time =
671 	    ds->ds_phys->ds_creation_time;
672 	drr->drr_u.drr_begin.drr_type = tosnap->os->os_phys->os_type;
673 	drr->drr_u.drr_begin.drr_toguid = ds->ds_phys->ds_guid;
674 	if (fromds)
675 		drr->drr_u.drr_begin.drr_fromguid = fromds->ds_phys->ds_guid;
676 	dsl_dataset_name(ds, drr->drr_u.drr_begin.drr_toname);
677 
678 	ba.drr = drr;
679 	ba.vp = vp;
680 	ba.os = tosnap;
681 	ZIO_SET_CHECKSUM(&ba.zc, 0, 0, 0, 0);
682 
683 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
684 		kmem_free(drr, sizeof (dmu_replay_record_t));
685 		return (ba.err);
686 	}
687 
688 	err = traverse_dsl_dataset(ds,
689 	    fromds ? fromds->ds_phys->ds_creation_txg : 0,
690 	    ADVANCE_PRE | ADVANCE_HOLES | ADVANCE_DATA | ADVANCE_NOLOCK,
691 	    backup_cb, &ba);
692 
693 	if (err) {
694 		if (err == EINTR && ba.err)
695 			err = ba.err;
696 		return (err);
697 	}
698 
699 	bzero(drr, sizeof (dmu_replay_record_t));
700 	drr->drr_type = DRR_END;
701 	drr->drr_u.drr_end.drr_checksum = ba.zc;
702 
703 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)))
704 		return (ba.err);
705 
706 	kmem_free(drr, sizeof (dmu_replay_record_t));
707 
708 	return (0);
709 }
710 
711 struct restorearg {
712 	int err;
713 	int byteswap;
714 	vnode_t *vp;
715 	char *buf;
716 	uint64_t voff;
717 	int buflen; /* number of valid bytes in buf */
718 	int bufoff; /* next offset to read */
719 	int bufsize; /* amount of memory allocated for buf */
720 	zio_cksum_t zc;
721 };
722 
723 static int
724 replay_incremental_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
725 {
726 	struct drr_begin *drrb = arg;
727 	dsl_dataset_t *ds = NULL;
728 	dsl_dataset_t *ds_prev = NULL;
729 	const char *snapname;
730 	int err = EINVAL;
731 	uint64_t val;
732 
733 	/* this must be a filesytem */
734 	if (dd->dd_phys->dd_head_dataset_obj == 0)
735 		goto die;
736 
737 	err = dsl_dataset_open_obj(dd->dd_pool,
738 	    dd->dd_phys->dd_head_dataset_obj,
739 	    NULL, DS_MODE_EXCLUSIVE, FTAG, &ds);
740 	if (err)
741 		goto die;
742 
743 	if (ds == NULL) {
744 		err = EBUSY;
745 		goto die;
746 	}
747 
748 	/* must already be a snapshot of this fs */
749 	if (ds->ds_phys->ds_prev_snap_obj == 0) {
750 		err = ENODEV;
751 		goto die;
752 	}
753 
754 	/* most recent snapshot must match fromguid */
755 	err = dsl_dataset_open_obj(dd->dd_pool,
756 	    ds->ds_phys->ds_prev_snap_obj, NULL,
757 	    DS_MODE_STANDARD | DS_MODE_READONLY, FTAG, &ds_prev);
758 	if (err)
759 		goto die;
760 	if (ds_prev->ds_phys->ds_guid != drrb->drr_fromguid) {
761 		err = ENODEV;
762 		goto die;
763 	}
764 
765 	/* must not have any changes since most recent snapshot */
766 	if (ds->ds_phys->ds_bp.blk_birth >
767 	    ds_prev->ds_phys->ds_creation_txg) {
768 		err = ETXTBSY;
769 		goto die;
770 	}
771 
772 	/* new snapshot name must not exist */
773 	snapname = strrchr(drrb->drr_toname, '@');
774 	if (snapname == NULL) {
775 		err = EEXIST;
776 		goto die;
777 	}
778 	snapname++;
779 	err = zap_lookup(dd->dd_pool->dp_meta_objset,
780 	    ds->ds_phys->ds_snapnames_zapobj, snapname, 8, 1, &val);
781 	if (err != ENOENT) {
782 		if (err == 0)
783 			err = EEXIST;
784 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
785 		dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
786 		return (err);
787 	}
788 
789 	dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
790 
791 	/* The point of no (unsuccessful) return. */
792 
793 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
794 	ds->ds_phys->ds_inconsistent = TRUE;
795 
796 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
797 	return (0);
798 
799 die:
800 	if (ds_prev)
801 		dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
802 	if (ds)
803 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
804 	return (err);
805 }
806 
807 static int
808 replay_full_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
809 {
810 	struct drr_begin *drrb = arg;
811 	int err;
812 	char *fsfullname, *fslastname, *cp;
813 	dsl_dataset_t *ds;
814 
815 	fsfullname = kmem_alloc(MAXNAMELEN, KM_SLEEP);
816 	(void) strncpy(fsfullname, drrb->drr_toname, MAXNAMELEN);
817 	cp = strchr(fsfullname, '@');
818 	if (cp == NULL) {
819 		kmem_free(fsfullname, MAXNAMELEN);
820 		return (EINVAL);
821 	}
822 	*cp = '\0';
823 	fslastname = strrchr(fsfullname, '/');
824 	if (fslastname == NULL) {
825 		kmem_free(fsfullname, MAXNAMELEN);
826 		return (EINVAL);
827 	}
828 	fslastname++;
829 
830 	err = dsl_dataset_create_sync(dd, fsfullname, fslastname, NULL, tx);
831 	if (err) {
832 		kmem_free(fsfullname, MAXNAMELEN);
833 		return (err);
834 	}
835 
836 	/* the point of no (unsuccessful) return */
837 
838 	VERIFY(0 == dsl_dataset_open_spa(dd->dd_pool->dp_spa, fsfullname,
839 	    DS_MODE_EXCLUSIVE, FTAG, &ds));
840 	kmem_free(fsfullname, MAXNAMELEN);
841 
842 	(void) dmu_objset_create_impl(dsl_dataset_get_spa(ds),
843 	    ds, drrb->drr_type, tx);
844 
845 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
846 	ds->ds_phys->ds_inconsistent = TRUE;
847 
848 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
849 	return (0);
850 }
851 
852 static int
853 replay_end_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
854 {
855 	struct drr_begin *drrb = arg;
856 	int err;
857 	char *snapname;
858 	dsl_dataset_t *ds;
859 
860 	/* XXX verify that drr_toname is in dd */
861 
862 	snapname = strchr(drrb->drr_toname, '@');
863 	if (snapname == NULL)
864 		return (EINVAL);
865 	snapname++;
866 
867 	/* create snapshot */
868 	err = dsl_dataset_snapshot_sync(dd, snapname, tx);
869 	if (err)
870 		return (err);
871 
872 	/* set snapshot's creation time and guid */
873 	VERIFY(0 == dsl_dataset_open_spa(dd->dd_pool->dp_spa, drrb->drr_toname,
874 	    DS_MODE_PRIMARY | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
875 	    FTAG, &ds));
876 
877 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
878 	ds->ds_phys->ds_creation_time = drrb->drr_creation_time;
879 	ds->ds_phys->ds_guid = drrb->drr_toguid;
880 	ds->ds_phys->ds_inconsistent = FALSE;
881 
882 	dsl_dataset_close(ds, DS_MODE_PRIMARY, FTAG);
883 
884 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
885 	    dd->dd_phys->dd_head_dataset_obj,
886 	    NULL, DS_MODE_STANDARD | DS_MODE_INCONSISTENT, FTAG, &ds));
887 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
888 	ds->ds_phys->ds_inconsistent = FALSE;
889 	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
890 
891 	return (0);
892 }
893 
894 void *
895 restore_read(struct restorearg *ra, int len)
896 {
897 	void *rv;
898 
899 	/* some things will require 8-byte alignment, so everything must */
900 	ASSERT3U(len % 8, ==, 0);
901 
902 	while (ra->buflen - ra->bufoff < len) {
903 		ssize_t resid;
904 		int leftover = ra->buflen - ra->bufoff;
905 
906 		(void) memmove(ra->buf, ra->buf + ra->bufoff, leftover);
907 		ra->err = vn_rdwr(UIO_READ, ra->vp,
908 		    (caddr_t)ra->buf + leftover, ra->bufsize - leftover,
909 		    ra->voff, UIO_SYSSPACE, FAPPEND,
910 		    RLIM64_INFINITY, CRED(), &resid);
911 
912 		ra->voff += ra->bufsize - leftover - resid;
913 		ra->buflen = ra->bufsize - resid;
914 		ra->bufoff = 0;
915 		if (resid == ra->bufsize - leftover)
916 			ra->err = EINVAL;
917 		if (ra->err)
918 			return (NULL);
919 		/* Could compute checksum here? */
920 	}
921 
922 	ASSERT3U(ra->bufoff % 8, ==, 0);
923 	ASSERT3U(ra->buflen - ra->bufoff, >=, len);
924 	rv = ra->buf + ra->bufoff;
925 	ra->bufoff += len;
926 	if (ra->byteswap)
927 		fletcher_4_incremental_byteswap(rv, len, &ra->zc);
928 	else
929 		fletcher_4_incremental_native(rv, len, &ra->zc);
930 	return (rv);
931 }
932 
933 static void
934 backup_byteswap(dmu_replay_record_t *drr)
935 {
936 #define	DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
937 #define	DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
938 	drr->drr_type = BSWAP_32(drr->drr_type);
939 	switch (drr->drr_type) {
940 	case DRR_BEGIN:
941 		DO64(drr_begin.drr_magic);
942 		DO64(drr_begin.drr_version);
943 		DO64(drr_begin.drr_creation_time);
944 		DO32(drr_begin.drr_type);
945 		DO64(drr_begin.drr_toguid);
946 		DO64(drr_begin.drr_fromguid);
947 		break;
948 	case DRR_OBJECT:
949 		DO64(drr_object.drr_object);
950 		/* DO64(drr_object.drr_allocation_txg); */
951 		DO32(drr_object.drr_type);
952 		DO32(drr_object.drr_bonustype);
953 		DO32(drr_object.drr_blksz);
954 		DO32(drr_object.drr_bonuslen);
955 		break;
956 	case DRR_FREEOBJECTS:
957 		DO64(drr_freeobjects.drr_firstobj);
958 		DO64(drr_freeobjects.drr_numobjs);
959 		break;
960 	case DRR_WRITE:
961 		DO64(drr_write.drr_object);
962 		DO32(drr_write.drr_type);
963 		DO64(drr_write.drr_offset);
964 		DO64(drr_write.drr_length);
965 		break;
966 	case DRR_FREE:
967 		DO64(drr_free.drr_object);
968 		DO64(drr_free.drr_offset);
969 		DO64(drr_free.drr_length);
970 		break;
971 	case DRR_END:
972 		DO64(drr_end.drr_checksum.zc_word[0]);
973 		DO64(drr_end.drr_checksum.zc_word[1]);
974 		DO64(drr_end.drr_checksum.zc_word[2]);
975 		DO64(drr_end.drr_checksum.zc_word[3]);
976 		break;
977 	}
978 #undef DO64
979 #undef DO32
980 }
981 
982 static int
983 restore_object(struct restorearg *ra, objset_t *os, struct drr_object *drro)
984 {
985 	int err;
986 	dmu_tx_t *tx;
987 
988 	err = dmu_object_info(os, drro->drr_object, NULL);
989 
990 	if (err != 0 && err != ENOENT)
991 		return (EINVAL);
992 
993 	if (drro->drr_type == DMU_OT_NONE ||
994 	    drro->drr_type >= DMU_OT_NUMTYPES ||
995 	    drro->drr_bonustype >= DMU_OT_NUMTYPES ||
996 	    drro->drr_checksum >= ZIO_CHECKSUM_FUNCTIONS ||
997 	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
998 	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
999 	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
1000 	    drro->drr_blksz > SPA_MAXBLOCKSIZE ||
1001 	    drro->drr_bonuslen > DN_MAX_BONUSLEN) {
1002 		return (EINVAL);
1003 	}
1004 
1005 	tx = dmu_tx_create(os);
1006 
1007 	if (err == ENOENT) {
1008 		/* currently free, want to be allocated */
1009 		dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT);
1010 		dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, 1);
1011 		err = dmu_tx_assign(tx, TXG_WAIT);
1012 		if (err) {
1013 			dmu_tx_abort(tx);
1014 			return (err);
1015 		}
1016 		err = dmu_object_claim(os, drro->drr_object,
1017 		    drro->drr_type, drro->drr_blksz,
1018 		    drro->drr_bonustype, drro->drr_bonuslen, tx);
1019 	} else {
1020 		/* currently allocated, want to be allocated */
1021 		dmu_tx_hold_bonus(tx, drro->drr_object);
1022 		/*
1023 		 * We may change blocksize, so need to
1024 		 * hold_write
1025 		 */
1026 		dmu_tx_hold_write(tx, drro->drr_object, 0, 1);
1027 		err = dmu_tx_assign(tx, TXG_WAIT);
1028 		if (err) {
1029 			dmu_tx_abort(tx);
1030 			return (err);
1031 		}
1032 
1033 		err = dmu_object_reclaim(os, drro->drr_object,
1034 		    drro->drr_type, drro->drr_blksz,
1035 		    drro->drr_bonustype, drro->drr_bonuslen, tx);
1036 	}
1037 	if (err) {
1038 		dmu_tx_commit(tx);
1039 		return (EINVAL);
1040 	}
1041 
1042 	dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksum, tx);
1043 	dmu_object_set_compress(os, drro->drr_object, drro->drr_compress, tx);
1044 
1045 	if (drro->drr_bonuslen) {
1046 		dmu_buf_t *db;
1047 		void *data;
1048 		VERIFY(0 == dmu_bonus_hold(os, drro->drr_object, FTAG, &db));
1049 		dmu_buf_will_dirty(db, tx);
1050 
1051 		ASSERT3U(db->db_size, ==, drro->drr_bonuslen);
1052 		data = restore_read(ra, P2ROUNDUP(db->db_size, 8));
1053 		if (data == NULL) {
1054 			dmu_tx_commit(tx);
1055 			return (ra->err);
1056 		}
1057 		bcopy(data, db->db_data, db->db_size);
1058 		if (ra->byteswap) {
1059 			dmu_ot[drro->drr_bonustype].ot_byteswap(db->db_data,
1060 			    drro->drr_bonuslen);
1061 		}
1062 		dmu_buf_rele(db, FTAG);
1063 	}
1064 	dmu_tx_commit(tx);
1065 	return (0);
1066 }
1067 
1068 /* ARGSUSED */
1069 static int
1070 restore_freeobjects(struct restorearg *ra, objset_t *os,
1071     struct drr_freeobjects *drrfo)
1072 {
1073 	uint64_t obj;
1074 
1075 	if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
1076 		return (EINVAL);
1077 
1078 	for (obj = drrfo->drr_firstobj;
1079 	    obj < drrfo->drr_firstobj + drrfo->drr_numobjs; obj++) {
1080 		dmu_tx_t *tx;
1081 		int err;
1082 
1083 		if (dmu_object_info(os, obj, NULL) != 0)
1084 			continue;
1085 
1086 		tx = dmu_tx_create(os);
1087 		dmu_tx_hold_bonus(tx, obj);
1088 		err = dmu_tx_assign(tx, TXG_WAIT);
1089 		if (err) {
1090 			dmu_tx_abort(tx);
1091 			return (err);
1092 		}
1093 		err = dmu_object_free(os, obj, tx);
1094 		dmu_tx_commit(tx);
1095 		if (err && err != ENOENT)
1096 			return (EINVAL);
1097 	}
1098 	return (0);
1099 }
1100 
1101 static int
1102 restore_write(struct restorearg *ra, objset_t *os,
1103     struct drr_write *drrw)
1104 {
1105 	dmu_tx_t *tx;
1106 	void *data;
1107 	int err;
1108 
1109 	if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
1110 	    drrw->drr_type >= DMU_OT_NUMTYPES)
1111 		return (EINVAL);
1112 
1113 	data = restore_read(ra, drrw->drr_length);
1114 	if (data == NULL)
1115 		return (ra->err);
1116 
1117 	if (dmu_object_info(os, drrw->drr_object, NULL) != 0)
1118 		return (EINVAL);
1119 
1120 	tx = dmu_tx_create(os);
1121 
1122 	dmu_tx_hold_write(tx, drrw->drr_object,
1123 	    drrw->drr_offset, drrw->drr_length);
1124 	err = dmu_tx_assign(tx, TXG_WAIT);
1125 	if (err) {
1126 		dmu_tx_abort(tx);
1127 		return (err);
1128 	}
1129 	if (ra->byteswap)
1130 		dmu_ot[drrw->drr_type].ot_byteswap(data, drrw->drr_length);
1131 	dmu_write(os, drrw->drr_object,
1132 	    drrw->drr_offset, drrw->drr_length, data, tx);
1133 	dmu_tx_commit(tx);
1134 	return (0);
1135 }
1136 
1137 /* ARGSUSED */
1138 static int
1139 restore_free(struct restorearg *ra, objset_t *os,
1140     struct drr_free *drrf)
1141 {
1142 	dmu_tx_t *tx;
1143 	int err;
1144 
1145 	if (drrf->drr_length != -1ULL &&
1146 	    drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
1147 		return (EINVAL);
1148 
1149 	if (dmu_object_info(os, drrf->drr_object, NULL) != 0)
1150 		return (EINVAL);
1151 
1152 	tx = dmu_tx_create(os);
1153 
1154 	dmu_tx_hold_free(tx, drrf->drr_object,
1155 	    drrf->drr_offset, drrf->drr_length);
1156 	err = dmu_tx_assign(tx, TXG_WAIT);
1157 	if (err) {
1158 		dmu_tx_abort(tx);
1159 		return (err);
1160 	}
1161 	err = dmu_free_range(os, drrf->drr_object,
1162 	    drrf->drr_offset, drrf->drr_length, tx);
1163 	dmu_tx_commit(tx);
1164 	return (err);
1165 }
1166 
1167 int
1168 dmu_recvbackup(char *tosnap, struct drr_begin *drrb, uint64_t *sizep,
1169     vnode_t *vp, uint64_t voffset)
1170 {
1171 	struct restorearg ra;
1172 	dmu_replay_record_t *drr;
1173 	char *cp;
1174 	dsl_dir_t *dd = NULL;
1175 	objset_t *os = NULL;
1176 	zio_cksum_t pzc;
1177 
1178 	bzero(&ra, sizeof (ra));
1179 	ra.vp = vp;
1180 	ra.voff = voffset;
1181 	ra.bufsize = 1<<20;
1182 	ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP);
1183 
1184 	if (drrb->drr_magic == DMU_BACKUP_MAGIC) {
1185 		ra.byteswap = FALSE;
1186 	} else if (drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1187 		ra.byteswap = TRUE;
1188 	} else {
1189 		ra.err = EINVAL;
1190 		goto out;
1191 	}
1192 
1193 	/*
1194 	 * NB: this assumes that struct drr_begin will be the largest in
1195 	 * dmu_replay_record_t's drr_u, and thus we don't need to pad it
1196 	 * with zeros to make it the same length as we wrote out.
1197 	 */
1198 	((dmu_replay_record_t *)ra.buf)->drr_type = DRR_BEGIN;
1199 	((dmu_replay_record_t *)ra.buf)->drr_pad = 0;
1200 	((dmu_replay_record_t *)ra.buf)->drr_u.drr_begin = *drrb;
1201 	if (ra.byteswap) {
1202 		fletcher_4_incremental_byteswap(ra.buf,
1203 		    sizeof (dmu_replay_record_t), &ra.zc);
1204 	} else {
1205 		fletcher_4_incremental_native(ra.buf,
1206 		    sizeof (dmu_replay_record_t), &ra.zc);
1207 	}
1208 	(void) strcpy(drrb->drr_toname, tosnap); /* for the sync funcs */
1209 
1210 	if (ra.byteswap) {
1211 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
1212 		drrb->drr_version = BSWAP_64(drrb->drr_version);
1213 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
1214 		drrb->drr_type = BSWAP_32(drrb->drr_type);
1215 		drrb->drr_toguid = BSWAP_64(drrb->drr_toguid);
1216 		drrb->drr_fromguid = BSWAP_64(drrb->drr_fromguid);
1217 	}
1218 
1219 	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1220 
1221 	if (drrb->drr_version != DMU_BACKUP_VERSION ||
1222 	    drrb->drr_type >= DMU_OST_NUMTYPES ||
1223 	    strchr(drrb->drr_toname, '@') == NULL) {
1224 		ra.err = EINVAL;
1225 		goto out;
1226 	}
1227 
1228 	/*
1229 	 * Process the begin in syncing context.
1230 	 */
1231 	if (drrb->drr_fromguid) {
1232 		/* incremental backup */
1233 
1234 		cp = strchr(tosnap, '@');
1235 		*cp = '\0';
1236 		ra.err = dsl_dir_open(tosnap, FTAG, &dd, NULL);
1237 		*cp = '@';
1238 		if (ra.err)
1239 			goto out;
1240 
1241 		ra.err = dsl_dir_sync_task(dd, replay_incremental_sync,
1242 		    drrb, 1<<20);
1243 	} else {
1244 		/* full backup */
1245 		const char *tail;
1246 
1247 		cp = strchr(tosnap, '@');
1248 		*cp = '\0';
1249 		ra.err = dsl_dir_open(tosnap, FTAG, &dd, &tail);
1250 		*cp = '@';
1251 		if (ra.err)
1252 			goto out;
1253 		if (tail == NULL) {
1254 			ra.err = EEXIST;
1255 			goto out;
1256 		}
1257 
1258 		ra.err = dsl_dir_sync_task(dd, replay_full_sync,
1259 		    drrb, 1<<20);
1260 	}
1261 	if (ra.err)
1262 		goto out;
1263 
1264 	/*
1265 	 * Open the objset we are modifying.
1266 	 */
1267 
1268 	cp = strchr(tosnap, '@');
1269 	*cp = '\0';
1270 	ra.err = dmu_objset_open(tosnap, DMU_OST_ANY,
1271 	    DS_MODE_PRIMARY | DS_MODE_INCONSISTENT, &os);
1272 	*cp = '@';
1273 	ASSERT3U(ra.err, ==, 0);
1274 
1275 	/*
1276 	 * Read records and process them.
1277 	 */
1278 	pzc = ra.zc;
1279 	while (ra.err == 0 &&
1280 	    NULL != (drr = restore_read(&ra, sizeof (*drr)))) {
1281 		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1282 			ra.err = EINTR;
1283 			goto out;
1284 		}
1285 
1286 		if (ra.byteswap)
1287 			backup_byteswap(drr);
1288 
1289 		switch (drr->drr_type) {
1290 		case DRR_OBJECT:
1291 		{
1292 			/*
1293 			 * We need to make a copy of the record header,
1294 			 * because restore_{object,write} may need to
1295 			 * restore_read(), which will invalidate drr.
1296 			 */
1297 			struct drr_object drro = drr->drr_u.drr_object;
1298 			ra.err = restore_object(&ra, os, &drro);
1299 			break;
1300 		}
1301 		case DRR_FREEOBJECTS:
1302 		{
1303 			struct drr_freeobjects drrfo =
1304 			    drr->drr_u.drr_freeobjects;
1305 			ra.err = restore_freeobjects(&ra, os, &drrfo);
1306 			break;
1307 		}
1308 		case DRR_WRITE:
1309 		{
1310 			struct drr_write drrw = drr->drr_u.drr_write;
1311 			ra.err = restore_write(&ra, os, &drrw);
1312 			break;
1313 		}
1314 		case DRR_FREE:
1315 		{
1316 			struct drr_free drrf = drr->drr_u.drr_free;
1317 			ra.err = restore_free(&ra, os, &drrf);
1318 			break;
1319 		}
1320 		case DRR_END:
1321 		{
1322 			struct drr_end drre = drr->drr_u.drr_end;
1323 			/*
1324 			 * We compare against the *previous* checksum
1325 			 * value, because the stored checksum is of
1326 			 * everything before the DRR_END record.
1327 			 */
1328 			if (drre.drr_checksum.zc_word[0] != 0 &&
1329 			    ((drre.drr_checksum.zc_word[0] - pzc.zc_word[0]) |
1330 			    (drre.drr_checksum.zc_word[1] - pzc.zc_word[1]) |
1331 			    (drre.drr_checksum.zc_word[2] - pzc.zc_word[2]) |
1332 			    (drre.drr_checksum.zc_word[3] - pzc.zc_word[3]))) {
1333 				ra.err = ECKSUM;
1334 				goto out;
1335 			}
1336 
1337 			/*
1338 			 * dd may be the parent of the dd we are
1339 			 * restoring into (eg. if it's a full backup).
1340 			 */
1341 			ra.err = dsl_dir_sync_task(dmu_objset_ds(os)->
1342 			    ds_dir, replay_end_sync, drrb, 1<<20);
1343 			goto out;
1344 		}
1345 		default:
1346 			ra.err = EINVAL;
1347 			goto out;
1348 		}
1349 		pzc = ra.zc;
1350 	}
1351 
1352 out:
1353 	if (os)
1354 		dmu_objset_close(os);
1355 
1356 	/*
1357 	 * Make sure we don't rollback/destroy unless we actually
1358 	 * processed the begin properly.  'os' will only be set if this
1359 	 * is the case.
1360 	 */
1361 	if (ra.err && os && dd && tosnap && strchr(tosnap, '@')) {
1362 		/*
1363 		 * rollback or destroy what we created, so we don't
1364 		 * leave it in the restoring state.
1365 		 */
1366 		txg_wait_synced(dd->dd_pool, 0);
1367 		if (drrb->drr_fromguid) {
1368 			/* incremental: rollback to most recent snapshot */
1369 			(void) dsl_dir_sync_task(dd,
1370 			    dsl_dataset_rollback_sync, NULL, 0);
1371 		} else {
1372 			/* full: destroy whole fs */
1373 			cp = strchr(tosnap, '@');
1374 			*cp = '\0';
1375 			cp = strchr(tosnap, '/');
1376 			if (cp) {
1377 				(void) dsl_dir_sync_task(dd,
1378 				    dsl_dir_destroy_sync, cp+1, 0);
1379 			}
1380 			cp = strchr(tosnap, '\0');
1381 			*cp = '@';
1382 		}
1383 
1384 	}
1385 
1386 	if (dd)
1387 		dsl_dir_close(dd, FTAG);
1388 	kmem_free(ra.buf, ra.bufsize);
1389 	if (sizep)
1390 		*sizep = ra.voff;
1391 	return (ra.err);
1392 }
1393 
1394 /*
1395  * Intent log support: sync the block at <os, object, offset> to disk.
1396  * N.B. and XXX: the caller is responsible for serializing dmu_sync()s
1397  * of the same block, and for making sure that the data isn't changing
1398  * while dmu_sync() is writing it.
1399  *
1400  * Return values:
1401  *
1402  *	EALREADY: this txg has already been synced, so there's nothing to to.
1403  *		The caller should not log the write.
1404  *
1405  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1406  *		The caller should not log the write.
1407  *
1408  *	EINPROGRESS: the block is in the process of being synced by the
1409  *		usual mechanism (spa_sync()), so we can't sync it here.
1410  *		The caller should txg_wait_synced() and not log the write.
1411  *
1412  *	EBUSY: another thread is trying to dmu_sync() the same dbuf.
1413  *		(This case cannot arise under the current locking rules.)
1414  *		The caller should txg_wait_synced() and not log the write.
1415  *
1416  *	ESTALE: the block was dirtied or freed while we were writing it,
1417  *		so the data is no longer valid.
1418  *		The caller should txg_wait_synced() and not log the write.
1419  *
1420  *	0: success.  Sets *bp to the blkptr just written, and sets
1421  *		*blkoff to the data's offset within that block.
1422  *		The caller should log this blkptr/blkoff in its lr_write_t.
1423  */
1424 int
1425 dmu_sync(objset_t *os, uint64_t object, uint64_t offset, uint64_t *blkoff,
1426     blkptr_t *bp, uint64_t txg)
1427 {
1428 	dsl_pool_t *dp = os->os->os_dsl_dataset->ds_dir->dd_pool;
1429 	tx_state_t *tx = &dp->dp_tx;
1430 	dmu_buf_impl_t *db;
1431 	blkptr_t *blk;
1432 	int err;
1433 	zbookmark_t zb;
1434 
1435 	ASSERT(RW_LOCK_HELD(&tx->tx_suspend));
1436 	ASSERT(BP_IS_HOLE(bp));
1437 	ASSERT(txg != 0);
1438 
1439 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
1440 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
1441 
1442 	/*
1443 	 * XXX why is this routine using dmu_buf_*() and casting between
1444 	 * dmu_buf_impl_t and dmu_buf_t?
1445 	 */
1446 
1447 	/*
1448 	 * If this txg already synced, there's nothing to do.
1449 	 */
1450 	if (txg <= tx->tx_synced_txg) {
1451 		/*
1452 		 * If we're running ziltest, we need the blkptr regardless.
1453 		 */
1454 		if (txg > spa_freeze_txg(dp->dp_spa)) {
1455 			err = dmu_buf_hold(os, object, offset,
1456 			    FTAG, (dmu_buf_t **)&db);
1457 			if (err)
1458 				return (err);
1459 			/* if db_blkptr == NULL, this was an empty write */
1460 			if (db->db_blkptr)
1461 				*bp = *db->db_blkptr; /* structure assignment */
1462 			else
1463 				bzero(bp, sizeof (blkptr_t));
1464 			*blkoff = offset - db->db.db_offset;
1465 			ASSERT3U(*blkoff, <, db->db.db_size);
1466 			dmu_buf_rele((dmu_buf_t *)db, FTAG);
1467 			return (0);
1468 		}
1469 		return (EALREADY);
1470 	}
1471 
1472 	/*
1473 	 * If this txg is in the middle of syncing, just wait for it.
1474 	 */
1475 	if (txg == tx->tx_syncing_txg) {
1476 		ASSERT(txg != tx->tx_open_txg);
1477 		return (EINPROGRESS);
1478 	}
1479 
1480 	err = dmu_buf_hold(os, object, offset, FTAG, (dmu_buf_t **)&db);
1481 	if (err)
1482 		return (err);
1483 
1484 	mutex_enter(&db->db_mtx);
1485 
1486 	/*
1487 	 * If this dbuf isn't dirty, must have been free_range'd.
1488 	 * There's no need to log writes to freed blocks, so we're done.
1489 	 */
1490 	if (!list_link_active(&db->db_dirty_node[txg&TXG_MASK])) {
1491 		mutex_exit(&db->db_mtx);
1492 		dmu_buf_rele((dmu_buf_t *)db, FTAG);
1493 		return (ENOENT);
1494 	}
1495 
1496 	blk = db->db_d.db_overridden_by[txg&TXG_MASK];
1497 
1498 	/*
1499 	 * If we already did a dmu_sync() of this dbuf in this txg,
1500 	 * free the old block before writing the new one.
1501 	 */
1502 	if (blk != NULL) {
1503 		ASSERT(blk != IN_DMU_SYNC);
1504 		if (blk == IN_DMU_SYNC) {
1505 			mutex_exit(&db->db_mtx);
1506 			dmu_buf_rele((dmu_buf_t *)db, FTAG);
1507 			return (EBUSY);
1508 		}
1509 		arc_release(db->db_d.db_data_old[txg&TXG_MASK], db);
1510 		if (!BP_IS_HOLE(blk)) {
1511 			(void) arc_free(NULL, os->os->os_spa, txg, blk,
1512 			    NULL, NULL, ARC_WAIT);
1513 		}
1514 		kmem_free(blk, sizeof (blkptr_t));
1515 	}
1516 
1517 	db->db_d.db_overridden_by[txg&TXG_MASK] = IN_DMU_SYNC;
1518 	mutex_exit(&db->db_mtx);
1519 
1520 	blk = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
1521 	blk->blk_birth = 0; /* mark as invalid */
1522 
1523 	zb.zb_objset = os->os->os_dsl_dataset->ds_object;
1524 	zb.zb_object = db->db.db_object;
1525 	zb.zb_level = db->db_level;
1526 	zb.zb_blkid = db->db_blkid;
1527 	err = arc_write(NULL, os->os->os_spa,
1528 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os->os_checksum),
1529 	    zio_compress_select(db->db_dnode->dn_compress, os->os->os_compress),
1530 	    txg, blk, db->db_d.db_data_old[txg&TXG_MASK], NULL, NULL,
1531 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, ARC_WAIT, &zb);
1532 	ASSERT(err == 0);
1533 
1534 	if (!BP_IS_HOLE(blk)) {
1535 		blk->blk_fill = 1;
1536 		BP_SET_TYPE(blk, db->db_dnode->dn_type);
1537 		BP_SET_LEVEL(blk, 0);
1538 	}
1539 
1540 	/* copy the block pointer back to caller */
1541 	*bp = *blk; /* structure assignment */
1542 	*blkoff = offset - db->db.db_offset;
1543 	ASSERT3U(*blkoff, <, db->db.db_size);
1544 
1545 	mutex_enter(&db->db_mtx);
1546 	if (db->db_d.db_overridden_by[txg&TXG_MASK] != IN_DMU_SYNC) {
1547 		/* we were dirtied/freed during the sync */
1548 		ASSERT3P(db->db_d.db_overridden_by[txg&TXG_MASK], ==, NULL);
1549 		arc_release(db->db_d.db_data_old[txg&TXG_MASK], db);
1550 		mutex_exit(&db->db_mtx);
1551 		dmu_buf_rele((dmu_buf_t *)db, FTAG);
1552 		/* Note that this block does not free on disk until txg syncs */
1553 
1554 		/*
1555 		 * XXX can we use ARC_NOWAIT here?
1556 		 * XXX should we be ignoring the return code?
1557 		 */
1558 		if (!BP_IS_HOLE(blk)) {
1559 			(void) arc_free(NULL, os->os->os_spa, txg, blk,
1560 			    NULL, NULL, ARC_WAIT);
1561 		}
1562 		kmem_free(blk, sizeof (blkptr_t));
1563 		return (ESTALE);
1564 	}
1565 
1566 	db->db_d.db_overridden_by[txg&TXG_MASK] = blk;
1567 	mutex_exit(&db->db_mtx);
1568 	dmu_buf_rele((dmu_buf_t *)db, FTAG);
1569 	ASSERT3U(txg, >, tx->tx_syncing_txg);
1570 	return (0);
1571 }
1572 
1573 uint64_t
1574 dmu_object_max_nonzero_offset(objset_t *os, uint64_t object)
1575 {
1576 	dnode_t *dn;
1577 
1578 	/* XXX assumes dnode_hold will not get an i/o error */
1579 	(void) dnode_hold(os->os, object, FTAG, &dn);
1580 	uint64_t rv = dnode_max_nonzero_offset(dn);
1581 	dnode_rele(dn, FTAG);
1582 	return (rv);
1583 }
1584 
1585 int
1586 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1587 	dmu_tx_t *tx)
1588 {
1589 	dnode_t *dn;
1590 	int err;
1591 
1592 	err = dnode_hold(os->os, object, FTAG, &dn);
1593 	if (err)
1594 		return (err);
1595 	err = dnode_set_blksz(dn, size, ibs, tx);
1596 	dnode_rele(dn, FTAG);
1597 	return (err);
1598 }
1599 
1600 void
1601 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1602 	dmu_tx_t *tx)
1603 {
1604 	dnode_t *dn;
1605 
1606 	/* XXX assumes dnode_hold will not get an i/o error */
1607 	(void) dnode_hold(os->os, object, FTAG, &dn);
1608 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1609 	dn->dn_checksum = checksum;
1610 	dnode_setdirty(dn, tx);
1611 	dnode_rele(dn, FTAG);
1612 }
1613 
1614 void
1615 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1616 	dmu_tx_t *tx)
1617 {
1618 	dnode_t *dn;
1619 
1620 	/* XXX assumes dnode_hold will not get an i/o error */
1621 	(void) dnode_hold(os->os, object, FTAG, &dn);
1622 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1623 	dn->dn_compress = compress;
1624 	dnode_setdirty(dn, tx);
1625 	dnode_rele(dn, FTAG);
1626 }
1627 
1628 int
1629 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1630 {
1631 	dnode_t *dn;
1632 	int i, err;
1633 
1634 	err = dnode_hold(os->os, object, FTAG, &dn);
1635 	if (err)
1636 		return (err);
1637 	/*
1638 	 * Sync any current changes before
1639 	 * we go trundling through the block pointers.
1640 	 */
1641 	for (i = 0; i < TXG_SIZE; i++) {
1642 		if (list_link_active(&dn->dn_dirty_link[i]))
1643 			break;
1644 	}
1645 	if (i != TXG_SIZE) {
1646 		dnode_rele(dn, FTAG);
1647 		txg_wait_synced(dmu_objset_pool(os), 0);
1648 		err = dnode_hold(os->os, object, FTAG, &dn);
1649 		if (err)
1650 			return (err);
1651 	}
1652 
1653 	err = dnode_next_offset(dn, hole, off, 1, 1);
1654 	dnode_rele(dn, FTAG);
1655 
1656 	return (err);
1657 }
1658 
1659 void
1660 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1661 {
1662 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1663 	mutex_enter(&dn->dn_mtx);
1664 
1665 	doi->doi_data_block_size = dn->dn_datablksz;
1666 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1667 	    1ULL << dn->dn_indblkshift : 0;
1668 	doi->doi_indirection = dn->dn_nlevels;
1669 	doi->doi_checksum = dn->dn_checksum;
1670 	doi->doi_compress = dn->dn_compress;
1671 	doi->doi_physical_blks = dn->dn_phys->dn_secphys;
1672 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
1673 	doi->doi_type = dn->dn_type;
1674 	doi->doi_bonus_size = dn->dn_bonuslen;
1675 	doi->doi_bonus_type = dn->dn_bonustype;
1676 
1677 	mutex_exit(&dn->dn_mtx);
1678 	rw_exit(&dn->dn_struct_rwlock);
1679 }
1680 
1681 /*
1682  * Get information on a DMU object.
1683  * If doi is NULL, just indicates whether the object exists.
1684  */
1685 int
1686 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1687 {
1688 	dnode_t *dn;
1689 	int err = dnode_hold(os->os, object, FTAG, &dn);
1690 
1691 	if (err)
1692 		return (err);
1693 
1694 	if (doi != NULL)
1695 		dmu_object_info_from_dnode(dn, doi);
1696 
1697 	dnode_rele(dn, FTAG);
1698 	return (0);
1699 }
1700 
1701 /*
1702  * As above, but faster; can be used when you have a held dbuf in hand.
1703  */
1704 void
1705 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1706 {
1707 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1708 }
1709 
1710 /*
1711  * Faster still when you only care about the size.
1712  * This is specifically optimized for zfs_getattr().
1713  */
1714 void
1715 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1716 {
1717 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1718 
1719 	*blksize = dn->dn_datablksz;
1720 	*nblk512 = dn->dn_phys->dn_secphys + 1;	/* add 1 for dnode space */
1721 }
1722 
1723 /*
1724  * Given a bookmark, return the name of the dataset, object, and range in
1725  * human-readable format.
1726  */
1727 int
1728 spa_bookmark_name(spa_t *spa, zbookmark_t *zb, char *dsname, size_t dslen,
1729     char *objname, size_t objlen, char *range, size_t rangelen)
1730 {
1731 	dsl_pool_t *dp;
1732 	dsl_dataset_t *ds = NULL;
1733 	objset_t *os = NULL;
1734 	dnode_t *dn = NULL;
1735 	int err, shift;
1736 
1737 	if (dslen < MAXNAMELEN || objlen < 32 || rangelen < 64)
1738 		return (ENOSPC);
1739 
1740 	dp = spa_get_dsl(spa);
1741 	if (zb->zb_objset != 0) {
1742 		rw_enter(&dp->dp_config_rwlock, RW_READER);
1743 		err = dsl_dataset_open_obj(dp, zb->zb_objset,
1744 		    NULL, DS_MODE_NONE, FTAG, &ds);
1745 		if (err) {
1746 			rw_exit(&dp->dp_config_rwlock);
1747 			return (err);
1748 		}
1749 		dsl_dataset_name(ds, dsname);
1750 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1751 		rw_exit(&dp->dp_config_rwlock);
1752 
1753 		err = dmu_objset_open(dsname, DMU_OST_ANY, DS_MODE_NONE, &os);
1754 		if (err)
1755 			goto out;
1756 
1757 	} else {
1758 		dsl_dataset_name(NULL, dsname);
1759 		os = dp->dp_meta_objset;
1760 	}
1761 
1762 
1763 	if (zb->zb_object == DMU_META_DNODE_OBJECT) {
1764 		(void) strncpy(objname, "mdn", objlen);
1765 	} else {
1766 		(void) snprintf(objname, objlen, "%lld",
1767 		    (longlong_t)zb->zb_object);
1768 	}
1769 
1770 	err = dnode_hold(os->os, zb->zb_object, FTAG, &dn);
1771 	if (err)
1772 		goto out;
1773 
1774 	shift = (dn->dn_datablkshift?dn->dn_datablkshift:SPA_MAXBLOCKSHIFT) +
1775 	    zb->zb_level * (dn->dn_indblkshift - SPA_BLKPTRSHIFT);
1776 	(void) snprintf(range, rangelen, "%llu-%llu",
1777 	    (u_longlong_t)(zb->zb_blkid << shift),
1778 	    (u_longlong_t)((zb->zb_blkid+1) << shift));
1779 
1780 out:
1781 	if (dn)
1782 		dnode_rele(dn, FTAG);
1783 	if (os && os != dp->dp_meta_objset)
1784 		dmu_objset_close(os);
1785 	return (err);
1786 }
1787 
1788 void
1789 byteswap_uint64_array(void *vbuf, size_t size)
1790 {
1791 	uint64_t *buf = vbuf;
1792 	size_t count = size >> 3;
1793 	int i;
1794 
1795 	ASSERT((size & 7) == 0);
1796 
1797 	for (i = 0; i < count; i++)
1798 		buf[i] = BSWAP_64(buf[i]);
1799 }
1800 
1801 void
1802 byteswap_uint32_array(void *vbuf, size_t size)
1803 {
1804 	uint32_t *buf = vbuf;
1805 	size_t count = size >> 2;
1806 	int i;
1807 
1808 	ASSERT((size & 3) == 0);
1809 
1810 	for (i = 0; i < count; i++)
1811 		buf[i] = BSWAP_32(buf[i]);
1812 }
1813 
1814 void
1815 byteswap_uint16_array(void *vbuf, size_t size)
1816 {
1817 	uint16_t *buf = vbuf;
1818 	size_t count = size >> 1;
1819 	int i;
1820 
1821 	ASSERT((size & 1) == 0);
1822 
1823 	for (i = 0; i < count; i++)
1824 		buf[i] = BSWAP_16(buf[i]);
1825 }
1826 
1827 /* ARGSUSED */
1828 void
1829 byteswap_uint8_array(void *vbuf, size_t size)
1830 {
1831 }
1832 
1833 void
1834 dmu_init(void)
1835 {
1836 	dbuf_init();
1837 	dnode_init();
1838 	arc_init();
1839 }
1840 
1841 void
1842 dmu_fini(void)
1843 {
1844 	arc_fini();
1845 	dnode_fini();
1846 	dbuf_fini();
1847 }
1848