xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu.c (revision 44eda4d76a9383a159e44aa60b63a17644ddd5b1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dmu_zfetch.h>
41 #include <sys/zfs_ioctl.h>
42 #include <sys/zap.h>
43 #include <sys/zio_checksum.h>
44 #ifdef _KERNEL
45 #include <sys/vmsystm.h>
46 #endif
47 
48 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
49 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
50 	{	zap_byteswap,		TRUE,	"object directory"	},
51 	{	byteswap_uint64_array,	TRUE,	"object array"		},
52 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
53 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
54 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
55 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
56 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
58 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
59 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
60 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
61 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
62 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
63 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
64 	{	zap_byteswap,		TRUE,	"DSL props"		},
65 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
66 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
67 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
68 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
69 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
70 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
71 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
72 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
73 	{	zap_byteswap,		TRUE,	"zvol prop"		},
74 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
75 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
76 	{	zap_byteswap,		TRUE,	"other ZAP"		},
77 	{	zap_byteswap,		TRUE,	"persistent error log"	},
78 };
79 
80 int
81 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
82     void *tag, dmu_buf_t **dbp)
83 {
84 	dnode_t *dn;
85 	uint64_t blkid;
86 	dmu_buf_impl_t *db;
87 	int err;
88 
89 	err = dnode_hold(os->os, object, FTAG, &dn);
90 	if (err)
91 		return (err);
92 	blkid = dbuf_whichblock(dn, offset);
93 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
94 	db = dbuf_hold(dn, blkid, tag);
95 	rw_exit(&dn->dn_struct_rwlock);
96 	if (db == NULL) {
97 		err = EIO;
98 	} else {
99 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
100 		if (err) {
101 			dbuf_rele(db, tag);
102 			db = NULL;
103 		}
104 	}
105 
106 	dnode_rele(dn, FTAG);
107 	*dbp = &db->db;
108 	return (err);
109 }
110 
111 int
112 dmu_bonus_max(void)
113 {
114 	return (DN_MAX_BONUSLEN);
115 }
116 
117 /*
118  * returns ENOENT, EIO, or 0.
119  */
120 int
121 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
122 {
123 	dnode_t *dn;
124 	int err, count;
125 	dmu_buf_impl_t *db;
126 
127 	err = dnode_hold(os->os, object, FTAG, &dn);
128 	if (err)
129 		return (err);
130 
131 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
132 	if (dn->dn_bonus == NULL) {
133 		rw_exit(&dn->dn_struct_rwlock);
134 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
135 		if (dn->dn_bonus == NULL)
136 			dn->dn_bonus = dbuf_create_bonus(dn);
137 	}
138 	db = dn->dn_bonus;
139 	rw_exit(&dn->dn_struct_rwlock);
140 	mutex_enter(&db->db_mtx);
141 	count = refcount_add(&db->db_holds, tag);
142 	mutex_exit(&db->db_mtx);
143 	if (count == 1)
144 		dnode_add_ref(dn, db);
145 	dnode_rele(dn, FTAG);
146 
147 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
148 
149 	*dbp = &db->db;
150 	return (0);
151 }
152 
153 /*
154  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
155  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
156  * and can induce severe lock contention when writing to several files
157  * whose dnodes are in the same block.
158  */
159 static int
160 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
161     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
162 {
163 	dmu_buf_t **dbp;
164 	uint64_t blkid, nblks, i;
165 	uint32_t flags;
166 	int err;
167 	zio_t *zio;
168 
169 	ASSERT(length <= DMU_MAX_ACCESS);
170 
171 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
172 	if (length > zfetch_array_rd_sz)
173 		flags |= DB_RF_NOPREFETCH;
174 
175 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
176 	if (dn->dn_datablkshift) {
177 		int blkshift = dn->dn_datablkshift;
178 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
179 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
180 	} else {
181 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
182 		nblks = 1;
183 	}
184 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
185 
186 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
187 	blkid = dbuf_whichblock(dn, offset);
188 	for (i = 0; i < nblks; i++) {
189 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
190 		if (db == NULL) {
191 			rw_exit(&dn->dn_struct_rwlock);
192 			dmu_buf_rele_array(dbp, nblks, tag);
193 			zio_nowait(zio);
194 			return (EIO);
195 		}
196 		/* initiate async i/o */
197 		if (read) {
198 			rw_exit(&dn->dn_struct_rwlock);
199 			(void) dbuf_read(db, zio, flags);
200 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
201 		}
202 		dbp[i] = &db->db;
203 	}
204 	rw_exit(&dn->dn_struct_rwlock);
205 
206 	/* wait for async i/o */
207 	err = zio_wait(zio);
208 	if (err) {
209 		dmu_buf_rele_array(dbp, nblks, tag);
210 		return (err);
211 	}
212 
213 	/* wait for other io to complete */
214 	if (read) {
215 		for (i = 0; i < nblks; i++) {
216 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
217 			mutex_enter(&db->db_mtx);
218 			while (db->db_state == DB_READ ||
219 			    db->db_state == DB_FILL)
220 				cv_wait(&db->db_changed, &db->db_mtx);
221 			if (db->db_state == DB_UNCACHED)
222 				err = EIO;
223 			mutex_exit(&db->db_mtx);
224 			if (err) {
225 				dmu_buf_rele_array(dbp, nblks, tag);
226 				return (err);
227 			}
228 		}
229 	}
230 
231 	*numbufsp = nblks;
232 	*dbpp = dbp;
233 	return (0);
234 }
235 
236 int
237 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
238     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
239 {
240 	dnode_t *dn;
241 	int err;
242 
243 	err = dnode_hold(os->os, object, FTAG, &dn);
244 	if (err)
245 		return (err);
246 
247 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
248 	    numbufsp, dbpp);
249 
250 	dnode_rele(dn, FTAG);
251 
252 	return (err);
253 }
254 
255 int
256 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
257     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
258 {
259 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
260 	int err;
261 
262 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
263 	    numbufsp, dbpp);
264 
265 	return (err);
266 }
267 
268 void
269 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
270 {
271 	int i;
272 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
273 
274 	if (numbufs == 0)
275 		return;
276 
277 	for (i = 0; i < numbufs; i++) {
278 		if (dbp[i])
279 			dbuf_rele(dbp[i], tag);
280 	}
281 
282 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
283 }
284 
285 void
286 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
287 {
288 	dnode_t *dn;
289 	uint64_t blkid;
290 	int nblks, i, err;
291 
292 	if (len == 0) {  /* they're interested in the bonus buffer */
293 		dn = os->os->os_meta_dnode;
294 
295 		if (object == 0 || object >= DN_MAX_OBJECT)
296 			return;
297 
298 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
299 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
300 		dbuf_prefetch(dn, blkid);
301 		rw_exit(&dn->dn_struct_rwlock);
302 		return;
303 	}
304 
305 	/*
306 	 * XXX - Note, if the dnode for the requested object is not
307 	 * already cached, we will do a *synchronous* read in the
308 	 * dnode_hold() call.  The same is true for any indirects.
309 	 */
310 	err = dnode_hold(os->os, object, FTAG, &dn);
311 	if (err != 0)
312 		return;
313 
314 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
315 	if (dn->dn_datablkshift) {
316 		int blkshift = dn->dn_datablkshift;
317 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
318 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
319 	} else {
320 		nblks = (offset < dn->dn_datablksz);
321 	}
322 
323 	if (nblks != 0) {
324 		blkid = dbuf_whichblock(dn, offset);
325 		for (i = 0; i < nblks; i++)
326 			dbuf_prefetch(dn, blkid+i);
327 	}
328 
329 	rw_exit(&dn->dn_struct_rwlock);
330 
331 	dnode_rele(dn, FTAG);
332 }
333 
334 int
335 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
336     uint64_t size, dmu_tx_t *tx)
337 {
338 	dnode_t *dn;
339 	int err = dnode_hold(os->os, object, FTAG, &dn);
340 	if (err)
341 		return (err);
342 	ASSERT(offset < UINT64_MAX);
343 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
344 	dnode_free_range(dn, offset, size, tx);
345 	dnode_rele(dn, FTAG);
346 	return (0);
347 }
348 
349 int
350 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
351     void *buf)
352 {
353 	dnode_t *dn;
354 	dmu_buf_t **dbp;
355 	int numbufs, i, err;
356 
357 	/*
358 	 * Deal with odd block sizes, where there can't be data past the
359 	 * first block.
360 	 */
361 	err = dnode_hold(os->os, object, FTAG, &dn);
362 	if (err)
363 		return (err);
364 	if (dn->dn_datablkshift == 0) {
365 		int newsz = offset > dn->dn_datablksz ? 0 :
366 		    MIN(size, dn->dn_datablksz - offset);
367 		bzero((char *)buf + newsz, size - newsz);
368 		size = newsz;
369 	}
370 	dnode_rele(dn, FTAG);
371 
372 	while (size > 0) {
373 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
374 		int err;
375 
376 		/*
377 		 * NB: we could do this block-at-a-time, but it's nice
378 		 * to be reading in parallel.
379 		 */
380 		err = dmu_buf_hold_array(os, object, offset, mylen,
381 		    TRUE, FTAG, &numbufs, &dbp);
382 		if (err)
383 			return (err);
384 
385 		for (i = 0; i < numbufs; i++) {
386 			int tocpy;
387 			int bufoff;
388 			dmu_buf_t *db = dbp[i];
389 
390 			ASSERT(size > 0);
391 
392 			bufoff = offset - db->db_offset;
393 			tocpy = (int)MIN(db->db_size - bufoff, size);
394 
395 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
396 
397 			offset += tocpy;
398 			size -= tocpy;
399 			buf = (char *)buf + tocpy;
400 		}
401 		dmu_buf_rele_array(dbp, numbufs, FTAG);
402 	}
403 	return (0);
404 }
405 
406 void
407 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
408     const void *buf, dmu_tx_t *tx)
409 {
410 	dmu_buf_t **dbp;
411 	int numbufs, i;
412 
413 	if (size == 0)
414 		return;
415 
416 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
417 	    FALSE, FTAG, &numbufs, &dbp));
418 
419 	for (i = 0; i < numbufs; i++) {
420 		int tocpy;
421 		int bufoff;
422 		dmu_buf_t *db = dbp[i];
423 
424 		ASSERT(size > 0);
425 
426 		bufoff = offset - db->db_offset;
427 		tocpy = (int)MIN(db->db_size - bufoff, size);
428 
429 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
430 
431 		if (tocpy == db->db_size)
432 			dmu_buf_will_fill(db, tx);
433 		else
434 			dmu_buf_will_dirty(db, tx);
435 
436 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
437 
438 		if (tocpy == db->db_size)
439 			dmu_buf_fill_done(db, tx);
440 
441 		offset += tocpy;
442 		size -= tocpy;
443 		buf = (char *)buf + tocpy;
444 	}
445 	dmu_buf_rele_array(dbp, numbufs, FTAG);
446 }
447 
448 #ifdef _KERNEL
449 int
450 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
451     uio_t *uio, dmu_tx_t *tx)
452 {
453 	dmu_buf_t **dbp;
454 	int numbufs, i;
455 	int err = 0;
456 
457 	if (size == 0)
458 		return (0);
459 
460 	err = dmu_buf_hold_array(os, object, offset, size,
461 	    FALSE, FTAG, &numbufs, &dbp);
462 	if (err)
463 		return (err);
464 
465 	for (i = 0; i < numbufs; i++) {
466 		int tocpy;
467 		int bufoff;
468 		dmu_buf_t *db = dbp[i];
469 
470 		ASSERT(size > 0);
471 
472 		bufoff = offset - db->db_offset;
473 		tocpy = (int)MIN(db->db_size - bufoff, size);
474 
475 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
476 
477 		if (tocpy == db->db_size)
478 			dmu_buf_will_fill(db, tx);
479 		else
480 			dmu_buf_will_dirty(db, tx);
481 
482 		/*
483 		 * XXX uiomove could block forever (eg. nfs-backed
484 		 * pages).  There needs to be a uiolockdown() function
485 		 * to lock the pages in memory, so that uiomove won't
486 		 * block.
487 		 */
488 		err = uiomove((char *)db->db_data + bufoff, tocpy,
489 		    UIO_WRITE, uio);
490 
491 		if (tocpy == db->db_size)
492 			dmu_buf_fill_done(db, tx);
493 
494 		if (err)
495 			break;
496 
497 		offset += tocpy;
498 		size -= tocpy;
499 	}
500 	dmu_buf_rele_array(dbp, numbufs, FTAG);
501 	return (err);
502 }
503 
504 int
505 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
506     page_t *pp, dmu_tx_t *tx)
507 {
508 	dmu_buf_t **dbp;
509 	int numbufs, i;
510 	int err;
511 
512 	if (size == 0)
513 		return (0);
514 
515 	err = dmu_buf_hold_array(os, object, offset, size,
516 	    FALSE, FTAG, &numbufs, &dbp);
517 	if (err)
518 		return (err);
519 
520 	for (i = 0; i < numbufs; i++) {
521 		int tocpy, copied, thiscpy;
522 		int bufoff;
523 		dmu_buf_t *db = dbp[i];
524 		caddr_t va;
525 
526 		ASSERT(size > 0);
527 		ASSERT3U(db->db_size, >=, PAGESIZE);
528 
529 		bufoff = offset - db->db_offset;
530 		tocpy = (int)MIN(db->db_size - bufoff, size);
531 
532 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
533 
534 		if (tocpy == db->db_size)
535 			dmu_buf_will_fill(db, tx);
536 		else
537 			dmu_buf_will_dirty(db, tx);
538 
539 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
540 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
541 			thiscpy = MIN(PAGESIZE, tocpy - copied);
542 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
543 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
544 			ppmapout(va);
545 			pp = pp->p_next;
546 			bufoff += PAGESIZE;
547 		}
548 
549 		if (tocpy == db->db_size)
550 			dmu_buf_fill_done(db, tx);
551 
552 		if (err)
553 			break;
554 
555 		offset += tocpy;
556 		size -= tocpy;
557 	}
558 	dmu_buf_rele_array(dbp, numbufs, FTAG);
559 	return (err);
560 }
561 #endif
562 
563 /*
564  * XXX move send/recv stuff to its own new file!
565  */
566 
567 struct backuparg {
568 	dmu_replay_record_t *drr;
569 	vnode_t *vp;
570 	objset_t *os;
571 	zio_cksum_t zc;
572 	int err;
573 };
574 
575 static int
576 dump_bytes(struct backuparg *ba, void *buf, int len)
577 {
578 	ssize_t resid; /* have to get resid to get detailed errno */
579 	ASSERT3U(len % 8, ==, 0);
580 
581 	fletcher_4_incremental_native(buf, len, &ba->zc);
582 	ba->err = vn_rdwr(UIO_WRITE, ba->vp,
583 	    (caddr_t)buf, len,
584 	    0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
585 	return (ba->err);
586 }
587 
588 static int
589 dump_free(struct backuparg *ba, uint64_t object, uint64_t offset,
590     uint64_t length)
591 {
592 	/* write a FREE record */
593 	bzero(ba->drr, sizeof (dmu_replay_record_t));
594 	ba->drr->drr_type = DRR_FREE;
595 	ba->drr->drr_u.drr_free.drr_object = object;
596 	ba->drr->drr_u.drr_free.drr_offset = offset;
597 	ba->drr->drr_u.drr_free.drr_length = length;
598 
599 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
600 		return (EINTR);
601 	return (0);
602 }
603 
604 static int
605 dump_data(struct backuparg *ba, dmu_object_type_t type,
606     uint64_t object, uint64_t offset, int blksz, void *data)
607 {
608 	/* write a DATA record */
609 	bzero(ba->drr, sizeof (dmu_replay_record_t));
610 	ba->drr->drr_type = DRR_WRITE;
611 	ba->drr->drr_u.drr_write.drr_object = object;
612 	ba->drr->drr_u.drr_write.drr_type = type;
613 	ba->drr->drr_u.drr_write.drr_offset = offset;
614 	ba->drr->drr_u.drr_write.drr_length = blksz;
615 
616 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
617 		return (EINTR);
618 	if (dump_bytes(ba, data, blksz))
619 		return (EINTR);
620 	return (0);
621 }
622 
623 static int
624 dump_freeobjects(struct backuparg *ba, uint64_t firstobj, uint64_t numobjs)
625 {
626 	/* write a FREEOBJECTS record */
627 	bzero(ba->drr, sizeof (dmu_replay_record_t));
628 	ba->drr->drr_type = DRR_FREEOBJECTS;
629 	ba->drr->drr_u.drr_freeobjects.drr_firstobj = firstobj;
630 	ba->drr->drr_u.drr_freeobjects.drr_numobjs = numobjs;
631 
632 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
633 		return (EINTR);
634 	return (0);
635 }
636 
637 static int
638 dump_dnode(struct backuparg *ba, uint64_t object, dnode_phys_t *dnp)
639 {
640 	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
641 		return (dump_freeobjects(ba, object, 1));
642 
643 	/* write an OBJECT record */
644 	bzero(ba->drr, sizeof (dmu_replay_record_t));
645 	ba->drr->drr_type = DRR_OBJECT;
646 	ba->drr->drr_u.drr_object.drr_object = object;
647 	ba->drr->drr_u.drr_object.drr_type = dnp->dn_type;
648 	ba->drr->drr_u.drr_object.drr_bonustype = dnp->dn_bonustype;
649 	ba->drr->drr_u.drr_object.drr_blksz =
650 	    dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
651 	ba->drr->drr_u.drr_object.drr_bonuslen = dnp->dn_bonuslen;
652 	ba->drr->drr_u.drr_object.drr_checksum = dnp->dn_checksum;
653 	ba->drr->drr_u.drr_object.drr_compress = dnp->dn_compress;
654 
655 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
656 		return (EINTR);
657 
658 	if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)))
659 		return (EINTR);
660 
661 	/* free anything past the end of the file */
662 	if (dump_free(ba, object, (dnp->dn_maxblkid + 1) *
663 	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL))
664 		return (EINTR);
665 	if (ba->err)
666 		return (EINTR);
667 	return (0);
668 }
669 
670 #define	BP_SPAN(dnp, level) \
671 	(((uint64_t)dnp->dn_datablkszsec) << (SPA_MINBLOCKSHIFT + \
672 	(level) * (dnp->dn_indblkshift - SPA_BLKPTRSHIFT)))
673 
674 static int
675 backup_cb(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
676 {
677 	struct backuparg *ba = arg;
678 	uint64_t object = bc->bc_bookmark.zb_object;
679 	int level = bc->bc_bookmark.zb_level;
680 	uint64_t blkid = bc->bc_bookmark.zb_blkid;
681 	blkptr_t *bp = bc->bc_blkptr.blk_birth ? &bc->bc_blkptr : NULL;
682 	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
683 	void *data = bc->bc_data;
684 	int err = 0;
685 
686 	if (issig(JUSTLOOKING) && issig(FORREAL))
687 		return (EINTR);
688 
689 	ASSERT(data || bp == NULL);
690 
691 	if (bp == NULL && object == 0) {
692 		uint64_t span = BP_SPAN(bc->bc_dnode, level);
693 		uint64_t dnobj = (blkid * span) >> DNODE_SHIFT;
694 		err = dump_freeobjects(ba, dnobj, span >> DNODE_SHIFT);
695 	} else if (bp == NULL) {
696 		uint64_t span = BP_SPAN(bc->bc_dnode, level);
697 		err = dump_free(ba, object, blkid * span, span);
698 	} else if (data && level == 0 && type == DMU_OT_DNODE) {
699 		dnode_phys_t *blk = data;
700 		int i;
701 		int blksz = BP_GET_LSIZE(bp);
702 
703 		for (i = 0; i < blksz >> DNODE_SHIFT; i++) {
704 			uint64_t dnobj =
705 			    (blkid << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)) + i;
706 			err = dump_dnode(ba, dnobj, blk+i);
707 			if (err)
708 				break;
709 		}
710 	} else if (level == 0 &&
711 	    type != DMU_OT_DNODE && type != DMU_OT_OBJSET) {
712 		int blksz = BP_GET_LSIZE(bp);
713 		if (data == NULL) {
714 			uint32_t aflags = ARC_WAIT;
715 			arc_buf_t *abuf;
716 			zbookmark_t zb;
717 
718 			zb.zb_objset = ba->os->os->os_dsl_dataset->ds_object;
719 			zb.zb_object = object;
720 			zb.zb_level = level;
721 			zb.zb_blkid = blkid;
722 			(void) arc_read(NULL, spa, bp,
723 			    dmu_ot[type].ot_byteswap, arc_getbuf_func, &abuf,
724 			    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_MUSTSUCCEED,
725 			    &aflags, &zb);
726 
727 			if (abuf) {
728 				err = dump_data(ba, type, object, blkid * blksz,
729 				    blksz, abuf->b_data);
730 				(void) arc_buf_remove_ref(abuf, &abuf);
731 			}
732 		} else {
733 			err = dump_data(ba, type, object, blkid * blksz,
734 			    blksz, data);
735 		}
736 	}
737 
738 	ASSERT(err == 0 || err == EINTR);
739 	return (err);
740 }
741 
742 int
743 dmu_sendbackup(objset_t *tosnap, objset_t *fromsnap, vnode_t *vp)
744 {
745 	dsl_dataset_t *ds = tosnap->os->os_dsl_dataset;
746 	dsl_dataset_t *fromds = fromsnap ? fromsnap->os->os_dsl_dataset : NULL;
747 	dmu_replay_record_t *drr;
748 	struct backuparg ba;
749 	int err;
750 
751 	/* tosnap must be a snapshot */
752 	if (ds->ds_phys->ds_next_snap_obj == 0)
753 		return (EINVAL);
754 
755 	/* fromsnap must be an earlier snapshot from the same fs as tosnap */
756 	if (fromds && (ds->ds_dir != fromds->ds_dir ||
757 	    fromds->ds_phys->ds_creation_txg >=
758 	    ds->ds_phys->ds_creation_txg))
759 		return (EXDEV);
760 
761 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
762 	drr->drr_type = DRR_BEGIN;
763 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
764 	drr->drr_u.drr_begin.drr_version = DMU_BACKUP_VERSION;
765 	drr->drr_u.drr_begin.drr_creation_time =
766 	    ds->ds_phys->ds_creation_time;
767 	drr->drr_u.drr_begin.drr_type = tosnap->os->os_phys->os_type;
768 	drr->drr_u.drr_begin.drr_toguid = ds->ds_phys->ds_guid;
769 	if (fromds)
770 		drr->drr_u.drr_begin.drr_fromguid = fromds->ds_phys->ds_guid;
771 	dsl_dataset_name(ds, drr->drr_u.drr_begin.drr_toname);
772 
773 	ba.drr = drr;
774 	ba.vp = vp;
775 	ba.os = tosnap;
776 	ZIO_SET_CHECKSUM(&ba.zc, 0, 0, 0, 0);
777 
778 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
779 		kmem_free(drr, sizeof (dmu_replay_record_t));
780 		return (ba.err);
781 	}
782 
783 	err = traverse_dsl_dataset(ds,
784 	    fromds ? fromds->ds_phys->ds_creation_txg : 0,
785 	    ADVANCE_PRE | ADVANCE_HOLES | ADVANCE_DATA | ADVANCE_NOLOCK,
786 	    backup_cb, &ba);
787 
788 	if (err) {
789 		if (err == EINTR && ba.err)
790 			err = ba.err;
791 		return (err);
792 	}
793 
794 	bzero(drr, sizeof (dmu_replay_record_t));
795 	drr->drr_type = DRR_END;
796 	drr->drr_u.drr_end.drr_checksum = ba.zc;
797 
798 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)))
799 		return (ba.err);
800 
801 	kmem_free(drr, sizeof (dmu_replay_record_t));
802 
803 	return (0);
804 }
805 
806 struct restorearg {
807 	int err;
808 	int byteswap;
809 	vnode_t *vp;
810 	char *buf;
811 	uint64_t voff;
812 	int buflen; /* number of valid bytes in buf */
813 	int bufoff; /* next offset to read */
814 	int bufsize; /* amount of memory allocated for buf */
815 	zio_cksum_t zc;
816 };
817 
818 /* ARGSUSED */
819 static int
820 replay_incremental_check(void *arg1, void *arg2, dmu_tx_t *tx)
821 {
822 	dsl_dataset_t *ds = arg1;
823 	struct drr_begin *drrb = arg2;
824 	const char *snapname;
825 	int err;
826 	uint64_t val;
827 
828 	/* must already be a snapshot of this fs */
829 	if (ds->ds_phys->ds_prev_snap_obj == 0)
830 		return (ENODEV);
831 
832 	/* most recent snapshot must match fromguid */
833 	if (ds->ds_prev->ds_phys->ds_guid != drrb->drr_fromguid)
834 		return (ENODEV);
835 	/* must not have any changes since most recent snapshot */
836 	if (ds->ds_phys->ds_bp.blk_birth >
837 	    ds->ds_prev->ds_phys->ds_creation_txg)
838 		return (ETXTBSY);
839 
840 	/* new snapshot name must not exist */
841 	snapname = strrchr(drrb->drr_toname, '@');
842 	if (snapname == NULL)
843 		return (EEXIST);
844 
845 	snapname++;
846 	err = zap_lookup(ds->ds_dir->dd_pool->dp_meta_objset,
847 	    ds->ds_phys->ds_snapnames_zapobj, snapname, 8, 1, &val);
848 	if (err == 0)
849 		return (EEXIST);
850 	if (err != ENOENT)
851 		return (err);
852 
853 	return (0);
854 }
855 
856 /* ARGSUSED */
857 static void
858 replay_incremental_sync(void *arg1, void *arg2, dmu_tx_t *tx)
859 {
860 	dsl_dataset_t *ds = arg1;
861 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
862 	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
863 }
864 
865 /* ARGSUSED */
866 static int
867 replay_full_check(void *arg1, void *arg2, dmu_tx_t *tx)
868 {
869 	dsl_dir_t *dd = arg1;
870 	struct drr_begin *drrb = arg2;
871 	objset_t *mos = dd->dd_pool->dp_meta_objset;
872 	char *cp;
873 	uint64_t val;
874 	int err;
875 
876 	cp = strchr(drrb->drr_toname, '@');
877 	*cp = '\0';
878 	err = zap_lookup(mos, dd->dd_phys->dd_child_dir_zapobj,
879 	    strrchr(drrb->drr_toname, '/') + 1,
880 	    sizeof (uint64_t), 1, &val);
881 	*cp = '@';
882 
883 	if (err != ENOENT)
884 		return (err ? err : EEXIST);
885 
886 	return (0);
887 }
888 
889 static void
890 replay_full_sync(void *arg1, void *arg2, dmu_tx_t *tx)
891 {
892 	dsl_dir_t *dd = arg1;
893 	struct drr_begin *drrb = arg2;
894 	char *cp;
895 	dsl_dataset_t *ds;
896 	uint64_t dsobj;
897 
898 	cp = strchr(drrb->drr_toname, '@');
899 	*cp = '\0';
900 	dsobj = dsl_dataset_create_sync(dd, strrchr(drrb->drr_toname, '/') + 1,
901 	    NULL, tx);
902 	*cp = '@';
903 
904 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, dsobj, NULL,
905 	    DS_MODE_EXCLUSIVE, FTAG, &ds));
906 
907 	(void) dmu_objset_create_impl(dsl_dataset_get_spa(ds),
908 	    ds, drrb->drr_type, tx);
909 
910 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
911 	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
912 
913 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
914 }
915 
916 static int
917 replay_end_check(void *arg1, void *arg2, dmu_tx_t *tx)
918 {
919 	objset_t *os = arg1;
920 	struct drr_begin *drrb = arg2;
921 	char *snapname;
922 
923 	/* XXX verify that drr_toname is in dd */
924 
925 	snapname = strchr(drrb->drr_toname, '@');
926 	if (snapname == NULL)
927 		return (EINVAL);
928 	snapname++;
929 
930 	return (dsl_dataset_snapshot_check(os, snapname, tx));
931 }
932 
933 static void
934 replay_end_sync(void *arg1, void *arg2, dmu_tx_t *tx)
935 {
936 	objset_t *os = arg1;
937 	struct drr_begin *drrb = arg2;
938 	char *snapname;
939 	dsl_dataset_t *ds, *hds;
940 
941 	snapname = strchr(drrb->drr_toname, '@') + 1;
942 
943 	dsl_dataset_snapshot_sync(os, snapname, tx);
944 
945 	/* set snapshot's creation time and guid */
946 	hds = os->os->os_dsl_dataset;
947 	VERIFY(0 == dsl_dataset_open_obj(hds->ds_dir->dd_pool,
948 	    hds->ds_phys->ds_prev_snap_obj, NULL,
949 	    DS_MODE_PRIMARY | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
950 	    FTAG, &ds));
951 
952 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
953 	ds->ds_phys->ds_creation_time = drrb->drr_creation_time;
954 	ds->ds_phys->ds_guid = drrb->drr_toguid;
955 	ds->ds_phys->ds_flags &= ~DS_FLAG_INCONSISTENT;
956 
957 	dsl_dataset_close(ds, DS_MODE_PRIMARY, FTAG);
958 
959 	dmu_buf_will_dirty(hds->ds_dbuf, tx);
960 	hds->ds_phys->ds_flags &= ~DS_FLAG_INCONSISTENT;
961 }
962 
963 void *
964 restore_read(struct restorearg *ra, int len)
965 {
966 	void *rv;
967 
968 	/* some things will require 8-byte alignment, so everything must */
969 	ASSERT3U(len % 8, ==, 0);
970 
971 	while (ra->buflen - ra->bufoff < len) {
972 		ssize_t resid;
973 		int leftover = ra->buflen - ra->bufoff;
974 
975 		(void) memmove(ra->buf, ra->buf + ra->bufoff, leftover);
976 		ra->err = vn_rdwr(UIO_READ, ra->vp,
977 		    (caddr_t)ra->buf + leftover, ra->bufsize - leftover,
978 		    ra->voff, UIO_SYSSPACE, FAPPEND,
979 		    RLIM64_INFINITY, CRED(), &resid);
980 
981 		ra->voff += ra->bufsize - leftover - resid;
982 		ra->buflen = ra->bufsize - resid;
983 		ra->bufoff = 0;
984 		if (resid == ra->bufsize - leftover)
985 			ra->err = EINVAL;
986 		if (ra->err)
987 			return (NULL);
988 		/* Could compute checksum here? */
989 	}
990 
991 	ASSERT3U(ra->bufoff % 8, ==, 0);
992 	ASSERT3U(ra->buflen - ra->bufoff, >=, len);
993 	rv = ra->buf + ra->bufoff;
994 	ra->bufoff += len;
995 	if (ra->byteswap)
996 		fletcher_4_incremental_byteswap(rv, len, &ra->zc);
997 	else
998 		fletcher_4_incremental_native(rv, len, &ra->zc);
999 	return (rv);
1000 }
1001 
1002 static void
1003 backup_byteswap(dmu_replay_record_t *drr)
1004 {
1005 #define	DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
1006 #define	DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
1007 	drr->drr_type = BSWAP_32(drr->drr_type);
1008 	switch (drr->drr_type) {
1009 	case DRR_BEGIN:
1010 		DO64(drr_begin.drr_magic);
1011 		DO64(drr_begin.drr_version);
1012 		DO64(drr_begin.drr_creation_time);
1013 		DO32(drr_begin.drr_type);
1014 		DO64(drr_begin.drr_toguid);
1015 		DO64(drr_begin.drr_fromguid);
1016 		break;
1017 	case DRR_OBJECT:
1018 		DO64(drr_object.drr_object);
1019 		/* DO64(drr_object.drr_allocation_txg); */
1020 		DO32(drr_object.drr_type);
1021 		DO32(drr_object.drr_bonustype);
1022 		DO32(drr_object.drr_blksz);
1023 		DO32(drr_object.drr_bonuslen);
1024 		break;
1025 	case DRR_FREEOBJECTS:
1026 		DO64(drr_freeobjects.drr_firstobj);
1027 		DO64(drr_freeobjects.drr_numobjs);
1028 		break;
1029 	case DRR_WRITE:
1030 		DO64(drr_write.drr_object);
1031 		DO32(drr_write.drr_type);
1032 		DO64(drr_write.drr_offset);
1033 		DO64(drr_write.drr_length);
1034 		break;
1035 	case DRR_FREE:
1036 		DO64(drr_free.drr_object);
1037 		DO64(drr_free.drr_offset);
1038 		DO64(drr_free.drr_length);
1039 		break;
1040 	case DRR_END:
1041 		DO64(drr_end.drr_checksum.zc_word[0]);
1042 		DO64(drr_end.drr_checksum.zc_word[1]);
1043 		DO64(drr_end.drr_checksum.zc_word[2]);
1044 		DO64(drr_end.drr_checksum.zc_word[3]);
1045 		break;
1046 	}
1047 #undef DO64
1048 #undef DO32
1049 }
1050 
1051 static int
1052 restore_object(struct restorearg *ra, objset_t *os, struct drr_object *drro)
1053 {
1054 	int err;
1055 	dmu_tx_t *tx;
1056 
1057 	err = dmu_object_info(os, drro->drr_object, NULL);
1058 
1059 	if (err != 0 && err != ENOENT)
1060 		return (EINVAL);
1061 
1062 	if (drro->drr_type == DMU_OT_NONE ||
1063 	    drro->drr_type >= DMU_OT_NUMTYPES ||
1064 	    drro->drr_bonustype >= DMU_OT_NUMTYPES ||
1065 	    drro->drr_checksum >= ZIO_CHECKSUM_FUNCTIONS ||
1066 	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
1067 	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
1068 	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
1069 	    drro->drr_blksz > SPA_MAXBLOCKSIZE ||
1070 	    drro->drr_bonuslen > DN_MAX_BONUSLEN) {
1071 		return (EINVAL);
1072 	}
1073 
1074 	tx = dmu_tx_create(os);
1075 
1076 	if (err == ENOENT) {
1077 		/* currently free, want to be allocated */
1078 		dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT);
1079 		dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, 1);
1080 		err = dmu_tx_assign(tx, TXG_WAIT);
1081 		if (err) {
1082 			dmu_tx_abort(tx);
1083 			return (err);
1084 		}
1085 		err = dmu_object_claim(os, drro->drr_object,
1086 		    drro->drr_type, drro->drr_blksz,
1087 		    drro->drr_bonustype, drro->drr_bonuslen, tx);
1088 	} else {
1089 		/* currently allocated, want to be allocated */
1090 		dmu_tx_hold_bonus(tx, drro->drr_object);
1091 		/*
1092 		 * We may change blocksize, so need to
1093 		 * hold_write
1094 		 */
1095 		dmu_tx_hold_write(tx, drro->drr_object, 0, 1);
1096 		err = dmu_tx_assign(tx, TXG_WAIT);
1097 		if (err) {
1098 			dmu_tx_abort(tx);
1099 			return (err);
1100 		}
1101 
1102 		err = dmu_object_reclaim(os, drro->drr_object,
1103 		    drro->drr_type, drro->drr_blksz,
1104 		    drro->drr_bonustype, drro->drr_bonuslen, tx);
1105 	}
1106 	if (err) {
1107 		dmu_tx_commit(tx);
1108 		return (EINVAL);
1109 	}
1110 
1111 	dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksum, tx);
1112 	dmu_object_set_compress(os, drro->drr_object, drro->drr_compress, tx);
1113 
1114 	if (drro->drr_bonuslen) {
1115 		dmu_buf_t *db;
1116 		void *data;
1117 		VERIFY(0 == dmu_bonus_hold(os, drro->drr_object, FTAG, &db));
1118 		dmu_buf_will_dirty(db, tx);
1119 
1120 		ASSERT3U(db->db_size, ==, drro->drr_bonuslen);
1121 		data = restore_read(ra, P2ROUNDUP(db->db_size, 8));
1122 		if (data == NULL) {
1123 			dmu_tx_commit(tx);
1124 			return (ra->err);
1125 		}
1126 		bcopy(data, db->db_data, db->db_size);
1127 		if (ra->byteswap) {
1128 			dmu_ot[drro->drr_bonustype].ot_byteswap(db->db_data,
1129 			    drro->drr_bonuslen);
1130 		}
1131 		dmu_buf_rele(db, FTAG);
1132 	}
1133 	dmu_tx_commit(tx);
1134 	return (0);
1135 }
1136 
1137 /* ARGSUSED */
1138 static int
1139 restore_freeobjects(struct restorearg *ra, objset_t *os,
1140     struct drr_freeobjects *drrfo)
1141 {
1142 	uint64_t obj;
1143 
1144 	if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
1145 		return (EINVAL);
1146 
1147 	for (obj = drrfo->drr_firstobj;
1148 	    obj < drrfo->drr_firstobj + drrfo->drr_numobjs; obj++) {
1149 		dmu_tx_t *tx;
1150 		int err;
1151 
1152 		if (dmu_object_info(os, obj, NULL) != 0)
1153 			continue;
1154 
1155 		tx = dmu_tx_create(os);
1156 		dmu_tx_hold_bonus(tx, obj);
1157 		err = dmu_tx_assign(tx, TXG_WAIT);
1158 		if (err) {
1159 			dmu_tx_abort(tx);
1160 			return (err);
1161 		}
1162 		err = dmu_object_free(os, obj, tx);
1163 		dmu_tx_commit(tx);
1164 		if (err && err != ENOENT)
1165 			return (EINVAL);
1166 	}
1167 	return (0);
1168 }
1169 
1170 static int
1171 restore_write(struct restorearg *ra, objset_t *os,
1172     struct drr_write *drrw)
1173 {
1174 	dmu_tx_t *tx;
1175 	void *data;
1176 	int err;
1177 
1178 	if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
1179 	    drrw->drr_type >= DMU_OT_NUMTYPES)
1180 		return (EINVAL);
1181 
1182 	data = restore_read(ra, drrw->drr_length);
1183 	if (data == NULL)
1184 		return (ra->err);
1185 
1186 	if (dmu_object_info(os, drrw->drr_object, NULL) != 0)
1187 		return (EINVAL);
1188 
1189 	tx = dmu_tx_create(os);
1190 
1191 	dmu_tx_hold_write(tx, drrw->drr_object,
1192 	    drrw->drr_offset, drrw->drr_length);
1193 	err = dmu_tx_assign(tx, TXG_WAIT);
1194 	if (err) {
1195 		dmu_tx_abort(tx);
1196 		return (err);
1197 	}
1198 	if (ra->byteswap)
1199 		dmu_ot[drrw->drr_type].ot_byteswap(data, drrw->drr_length);
1200 	dmu_write(os, drrw->drr_object,
1201 	    drrw->drr_offset, drrw->drr_length, data, tx);
1202 	dmu_tx_commit(tx);
1203 	return (0);
1204 }
1205 
1206 /* ARGSUSED */
1207 static int
1208 restore_free(struct restorearg *ra, objset_t *os,
1209     struct drr_free *drrf)
1210 {
1211 	dmu_tx_t *tx;
1212 	int err;
1213 
1214 	if (drrf->drr_length != -1ULL &&
1215 	    drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
1216 		return (EINVAL);
1217 
1218 	if (dmu_object_info(os, drrf->drr_object, NULL) != 0)
1219 		return (EINVAL);
1220 
1221 	tx = dmu_tx_create(os);
1222 
1223 	dmu_tx_hold_free(tx, drrf->drr_object,
1224 	    drrf->drr_offset, drrf->drr_length);
1225 	err = dmu_tx_assign(tx, TXG_WAIT);
1226 	if (err) {
1227 		dmu_tx_abort(tx);
1228 		return (err);
1229 	}
1230 	err = dmu_free_range(os, drrf->drr_object,
1231 	    drrf->drr_offset, drrf->drr_length, tx);
1232 	dmu_tx_commit(tx);
1233 	return (err);
1234 }
1235 
1236 int
1237 dmu_recvbackup(char *tosnap, struct drr_begin *drrb, uint64_t *sizep,
1238     boolean_t force, vnode_t *vp, uint64_t voffset)
1239 {
1240 	struct restorearg ra;
1241 	dmu_replay_record_t *drr;
1242 	char *cp;
1243 	objset_t *os = NULL;
1244 	zio_cksum_t pzc;
1245 
1246 	bzero(&ra, sizeof (ra));
1247 	ra.vp = vp;
1248 	ra.voff = voffset;
1249 	ra.bufsize = 1<<20;
1250 	ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP);
1251 
1252 	if (drrb->drr_magic == DMU_BACKUP_MAGIC) {
1253 		ra.byteswap = FALSE;
1254 	} else if (drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1255 		ra.byteswap = TRUE;
1256 	} else {
1257 		ra.err = EINVAL;
1258 		goto out;
1259 	}
1260 
1261 	/*
1262 	 * NB: this assumes that struct drr_begin will be the largest in
1263 	 * dmu_replay_record_t's drr_u, and thus we don't need to pad it
1264 	 * with zeros to make it the same length as we wrote out.
1265 	 */
1266 	((dmu_replay_record_t *)ra.buf)->drr_type = DRR_BEGIN;
1267 	((dmu_replay_record_t *)ra.buf)->drr_pad = 0;
1268 	((dmu_replay_record_t *)ra.buf)->drr_u.drr_begin = *drrb;
1269 	if (ra.byteswap) {
1270 		fletcher_4_incremental_byteswap(ra.buf,
1271 		    sizeof (dmu_replay_record_t), &ra.zc);
1272 	} else {
1273 		fletcher_4_incremental_native(ra.buf,
1274 		    sizeof (dmu_replay_record_t), &ra.zc);
1275 	}
1276 	(void) strcpy(drrb->drr_toname, tosnap); /* for the sync funcs */
1277 
1278 	if (ra.byteswap) {
1279 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
1280 		drrb->drr_version = BSWAP_64(drrb->drr_version);
1281 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
1282 		drrb->drr_type = BSWAP_32(drrb->drr_type);
1283 		drrb->drr_toguid = BSWAP_64(drrb->drr_toguid);
1284 		drrb->drr_fromguid = BSWAP_64(drrb->drr_fromguid);
1285 	}
1286 
1287 	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1288 
1289 	if (drrb->drr_version != DMU_BACKUP_VERSION ||
1290 	    drrb->drr_type >= DMU_OST_NUMTYPES ||
1291 	    strchr(drrb->drr_toname, '@') == NULL) {
1292 		ra.err = EINVAL;
1293 		goto out;
1294 	}
1295 
1296 	/*
1297 	 * Process the begin in syncing context.
1298 	 */
1299 	if (drrb->drr_fromguid) {
1300 		/* incremental backup */
1301 		dsl_dataset_t *ds = NULL;
1302 
1303 		cp = strchr(tosnap, '@');
1304 		*cp = '\0';
1305 		ra.err = dsl_dataset_open(tosnap, DS_MODE_EXCLUSIVE, FTAG, &ds);
1306 		*cp = '@';
1307 		if (ra.err)
1308 			goto out;
1309 
1310 		/*
1311 		 * Only do the rollback if the most recent snapshot
1312 		 * matches the incremental source
1313 		 */
1314 		if (force) {
1315 			if (ds->ds_prev->ds_phys->ds_guid !=
1316 			    drrb->drr_fromguid) {
1317 				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1318 				return (ENODEV);
1319 			}
1320 			(void) dsl_dataset_rollback(ds);
1321 		}
1322 		ra.err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1323 		    replay_incremental_check, replay_incremental_sync,
1324 		    ds, drrb, 1);
1325 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1326 	} else {
1327 		/* full backup */
1328 		dsl_dir_t *dd = NULL;
1329 		const char *tail;
1330 
1331 		/* can't restore full backup into topmost fs, for now */
1332 		if (strrchr(drrb->drr_toname, '/') == NULL) {
1333 			ra.err = EINVAL;
1334 			goto out;
1335 		}
1336 
1337 		cp = strchr(tosnap, '@');
1338 		*cp = '\0';
1339 		ra.err = dsl_dir_open(tosnap, FTAG, &dd, &tail);
1340 		*cp = '@';
1341 		if (ra.err)
1342 			goto out;
1343 		if (tail == NULL) {
1344 			ra.err = EEXIST;
1345 			goto out;
1346 		}
1347 
1348 		ra.err = dsl_sync_task_do(dd->dd_pool, replay_full_check,
1349 		    replay_full_sync, dd, drrb, 5);
1350 		dsl_dir_close(dd, FTAG);
1351 	}
1352 	if (ra.err)
1353 		goto out;
1354 
1355 	/*
1356 	 * Open the objset we are modifying.
1357 	 */
1358 
1359 	cp = strchr(tosnap, '@');
1360 	*cp = '\0';
1361 	ra.err = dmu_objset_open(tosnap, DMU_OST_ANY,
1362 	    DS_MODE_PRIMARY | DS_MODE_INCONSISTENT, &os);
1363 	*cp = '@';
1364 	ASSERT3U(ra.err, ==, 0);
1365 
1366 	/*
1367 	 * Read records and process them.
1368 	 */
1369 	pzc = ra.zc;
1370 	while (ra.err == 0 &&
1371 	    NULL != (drr = restore_read(&ra, sizeof (*drr)))) {
1372 		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1373 			ra.err = EINTR;
1374 			goto out;
1375 		}
1376 
1377 		if (ra.byteswap)
1378 			backup_byteswap(drr);
1379 
1380 		switch (drr->drr_type) {
1381 		case DRR_OBJECT:
1382 		{
1383 			/*
1384 			 * We need to make a copy of the record header,
1385 			 * because restore_{object,write} may need to
1386 			 * restore_read(), which will invalidate drr.
1387 			 */
1388 			struct drr_object drro = drr->drr_u.drr_object;
1389 			ra.err = restore_object(&ra, os, &drro);
1390 			break;
1391 		}
1392 		case DRR_FREEOBJECTS:
1393 		{
1394 			struct drr_freeobjects drrfo =
1395 			    drr->drr_u.drr_freeobjects;
1396 			ra.err = restore_freeobjects(&ra, os, &drrfo);
1397 			break;
1398 		}
1399 		case DRR_WRITE:
1400 		{
1401 			struct drr_write drrw = drr->drr_u.drr_write;
1402 			ra.err = restore_write(&ra, os, &drrw);
1403 			break;
1404 		}
1405 		case DRR_FREE:
1406 		{
1407 			struct drr_free drrf = drr->drr_u.drr_free;
1408 			ra.err = restore_free(&ra, os, &drrf);
1409 			break;
1410 		}
1411 		case DRR_END:
1412 		{
1413 			struct drr_end drre = drr->drr_u.drr_end;
1414 			/*
1415 			 * We compare against the *previous* checksum
1416 			 * value, because the stored checksum is of
1417 			 * everything before the DRR_END record.
1418 			 */
1419 			if (drre.drr_checksum.zc_word[0] != 0 &&
1420 			    ((drre.drr_checksum.zc_word[0] - pzc.zc_word[0]) |
1421 			    (drre.drr_checksum.zc_word[1] - pzc.zc_word[1]) |
1422 			    (drre.drr_checksum.zc_word[2] - pzc.zc_word[2]) |
1423 			    (drre.drr_checksum.zc_word[3] - pzc.zc_word[3]))) {
1424 				ra.err = ECKSUM;
1425 				goto out;
1426 			}
1427 
1428 			ra.err = dsl_sync_task_do(dmu_objset_ds(os)->
1429 			    ds_dir->dd_pool, replay_end_check, replay_end_sync,
1430 			    os, drrb, 3);
1431 			goto out;
1432 		}
1433 		default:
1434 			ra.err = EINVAL;
1435 			goto out;
1436 		}
1437 		pzc = ra.zc;
1438 	}
1439 
1440 out:
1441 	if (os)
1442 		dmu_objset_close(os);
1443 
1444 	/*
1445 	 * Make sure we don't rollback/destroy unless we actually
1446 	 * processed the begin properly.  'os' will only be set if this
1447 	 * is the case.
1448 	 */
1449 	if (ra.err && os && tosnap && strchr(tosnap, '@')) {
1450 		/*
1451 		 * rollback or destroy what we created, so we don't
1452 		 * leave it in the restoring state.
1453 		 */
1454 		dsl_dataset_t *ds;
1455 		int err;
1456 
1457 		cp = strchr(tosnap, '@');
1458 		*cp = '\0';
1459 		err = dsl_dataset_open(tosnap,
1460 		    DS_MODE_EXCLUSIVE | DS_MODE_INCONSISTENT,
1461 		    FTAG, &ds);
1462 		if (err == 0) {
1463 			txg_wait_synced(ds->ds_dir->dd_pool, 0);
1464 			if (drrb->drr_fromguid) {
1465 				/* incremental: rollback to most recent snap */
1466 				(void) dsl_dataset_rollback(ds);
1467 				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1468 			} else {
1469 				/* full: destroy whole fs */
1470 				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1471 				(void) dsl_dataset_destroy(tosnap);
1472 			}
1473 		}
1474 		*cp = '@';
1475 	}
1476 
1477 	kmem_free(ra.buf, ra.bufsize);
1478 	if (sizep)
1479 		*sizep = ra.voff;
1480 	return (ra.err);
1481 }
1482 
1483 typedef struct {
1484 	uint64_t	txg;
1485 	dmu_buf_impl_t	*db;
1486 	dmu_sync_cb_t	*done;
1487 	void		*arg;
1488 } dmu_sync_cbin_t;
1489 
1490 typedef union {
1491 	dmu_sync_cbin_t	data;
1492 	blkptr_t	blk;
1493 } dmu_sync_cbarg_t;
1494 
1495 /* ARGSUSED */
1496 static void
1497 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
1498 {
1499 	dmu_sync_cbin_t *in = (dmu_sync_cbin_t *)varg;
1500 	dmu_buf_impl_t *db = in->db;
1501 	uint64_t txg = in->txg;
1502 	dmu_sync_cb_t *done = in->done;
1503 	void *arg = in->arg;
1504 	blkptr_t *blk = (blkptr_t *)varg;
1505 
1506 	if (!BP_IS_HOLE(zio->io_bp)) {
1507 		zio->io_bp->blk_fill = 1;
1508 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
1509 		BP_SET_LEVEL(zio->io_bp, 0);
1510 	}
1511 
1512 	*blk = *zio->io_bp; /* structure assignment */
1513 
1514 	mutex_enter(&db->db_mtx);
1515 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == IN_DMU_SYNC);
1516 	db->db_d.db_overridden_by[txg&TXG_MASK] = blk;
1517 	cv_broadcast(&db->db_changed);
1518 	mutex_exit(&db->db_mtx);
1519 
1520 	if (done)
1521 		done(&(db->db), arg);
1522 }
1523 
1524 /*
1525  * Intent log support: sync the block associated with db to disk.
1526  * N.B. and XXX: the caller is responsible for making sure that the
1527  * data isn't changing while dmu_sync() is writing it.
1528  *
1529  * Return values:
1530  *
1531  *	EEXIST: this txg has already been synced, so there's nothing to to.
1532  *		The caller should not log the write.
1533  *
1534  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1535  *		The caller should not log the write.
1536  *
1537  *	EALREADY: this block is already in the process of being synced.
1538  *		The caller should track its progress (somehow).
1539  *
1540  *	EINPROGRESS: the IO has been initiated.
1541  *		The caller should log this blkptr in the callback.
1542  *
1543  *	0: completed.  Sets *bp to the blkptr just written.
1544  *		The caller should log this blkptr immediately.
1545  */
1546 int
1547 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
1548     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
1549 {
1550 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
1551 	objset_impl_t *os = db->db_objset;
1552 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
1553 	tx_state_t *tx = &dp->dp_tx;
1554 	dmu_sync_cbin_t *in;
1555 	blkptr_t *blk;
1556 	zbookmark_t zb;
1557 	uint32_t arc_flag;
1558 	int err;
1559 
1560 	ASSERT(BP_IS_HOLE(bp));
1561 	ASSERT(txg != 0);
1562 
1563 
1564 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
1565 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
1566 
1567 	/*
1568 	 * XXX - would be nice if we could do this without suspending...
1569 	 */
1570 	txg_suspend(dp);
1571 
1572 	/*
1573 	 * If this txg already synced, there's nothing to do.
1574 	 */
1575 	if (txg <= tx->tx_synced_txg) {
1576 		txg_resume(dp);
1577 		/*
1578 		 * If we're running ziltest, we need the blkptr regardless.
1579 		 */
1580 		if (txg > spa_freeze_txg(dp->dp_spa)) {
1581 			/* if db_blkptr == NULL, this was an empty write */
1582 			if (db->db_blkptr)
1583 				*bp = *db->db_blkptr; /* structure assignment */
1584 			return (0);
1585 		}
1586 		return (EEXIST);
1587 	}
1588 
1589 	mutex_enter(&db->db_mtx);
1590 
1591 	blk = db->db_d.db_overridden_by[txg&TXG_MASK];
1592 	if (blk == IN_DMU_SYNC) {
1593 		/*
1594 		 * We have already issued a sync write for this buffer.
1595 		 */
1596 		mutex_exit(&db->db_mtx);
1597 		txg_resume(dp);
1598 		return (EALREADY);
1599 	} else if (blk != NULL) {
1600 		/*
1601 		 * This buffer had already been synced.  It could not
1602 		 * have been dirtied since, or we would have cleared blk.
1603 		 */
1604 		*bp = *blk; /* structure assignment */
1605 		mutex_exit(&db->db_mtx);
1606 		txg_resume(dp);
1607 		return (0);
1608 	}
1609 
1610 	if (txg == tx->tx_syncing_txg) {
1611 		while (db->db_data_pending) {
1612 			/*
1613 			 * IO is in-progress.  Wait for it to finish.
1614 			 * XXX - would be nice to be able to somehow "attach"
1615 			 * this zio to the parent zio passed in.
1616 			 */
1617 			cv_wait(&db->db_changed, &db->db_mtx);
1618 			if (!db->db_data_pending &&
1619 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
1620 				/*
1621 				 * IO was compressed away
1622 				 */
1623 				*bp = *db->db_blkptr; /* structure assignment */
1624 				mutex_exit(&db->db_mtx);
1625 				txg_resume(dp);
1626 				return (0);
1627 			}
1628 			ASSERT(db->db_data_pending ||
1629 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
1630 		}
1631 
1632 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
1633 			/*
1634 			 * IO is already completed.
1635 			 */
1636 			*bp = *db->db_blkptr; /* structure assignment */
1637 			mutex_exit(&db->db_mtx);
1638 			txg_resume(dp);
1639 			return (0);
1640 		}
1641 	}
1642 
1643 	if (db->db_d.db_data_old[txg&TXG_MASK] == NULL) {
1644 		/*
1645 		 * This dbuf isn't dirty, must have been free_range'd.
1646 		 * There's no need to log writes to freed blocks, so we're done.
1647 		 */
1648 		mutex_exit(&db->db_mtx);
1649 		txg_resume(dp);
1650 		return (ENOENT);
1651 	}
1652 
1653 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == NULL);
1654 	db->db_d.db_overridden_by[txg&TXG_MASK] = IN_DMU_SYNC;
1655 	/*
1656 	 * XXX - a little ugly to stash the blkptr in the callback
1657 	 * buffer.  We always need to make sure the following is true:
1658 	 * ASSERT(sizeof(blkptr_t) >= sizeof(dmu_sync_cbin_t));
1659 	 */
1660 	in = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
1661 	in->db = db;
1662 	in->txg = txg;
1663 	in->done = done;
1664 	in->arg = arg;
1665 	mutex_exit(&db->db_mtx);
1666 	txg_resume(dp);
1667 
1668 	arc_flag = pio == NULL ? ARC_WAIT : ARC_NOWAIT;
1669 	zb.zb_objset = os->os_dsl_dataset->ds_object;
1670 	zb.zb_object = db->db.db_object;
1671 	zb.zb_level = db->db_level;
1672 	zb.zb_blkid = db->db_blkid;
1673 	err = arc_write(pio, os->os_spa,
1674 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
1675 	    zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
1676 	    dmu_get_replication_level(os->os_spa, &zb, db->db_dnode->dn_type),
1677 	    txg, bp, db->db_d.db_data_old[txg&TXG_MASK], dmu_sync_done, in,
1678 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, arc_flag, &zb);
1679 	ASSERT(err == 0);
1680 
1681 	return (arc_flag == ARC_NOWAIT ? EINPROGRESS : 0);
1682 }
1683 
1684 uint64_t
1685 dmu_object_max_nonzero_offset(objset_t *os, uint64_t object)
1686 {
1687 	dnode_t *dn;
1688 
1689 	/* XXX assumes dnode_hold will not get an i/o error */
1690 	(void) dnode_hold(os->os, object, FTAG, &dn);
1691 	uint64_t rv = dnode_max_nonzero_offset(dn);
1692 	dnode_rele(dn, FTAG);
1693 	return (rv);
1694 }
1695 
1696 int
1697 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1698 	dmu_tx_t *tx)
1699 {
1700 	dnode_t *dn;
1701 	int err;
1702 
1703 	err = dnode_hold(os->os, object, FTAG, &dn);
1704 	if (err)
1705 		return (err);
1706 	err = dnode_set_blksz(dn, size, ibs, tx);
1707 	dnode_rele(dn, FTAG);
1708 	return (err);
1709 }
1710 
1711 void
1712 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1713 	dmu_tx_t *tx)
1714 {
1715 	dnode_t *dn;
1716 
1717 	/* XXX assumes dnode_hold will not get an i/o error */
1718 	(void) dnode_hold(os->os, object, FTAG, &dn);
1719 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1720 	dn->dn_checksum = checksum;
1721 	dnode_setdirty(dn, tx);
1722 	dnode_rele(dn, FTAG);
1723 }
1724 
1725 void
1726 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1727 	dmu_tx_t *tx)
1728 {
1729 	dnode_t *dn;
1730 
1731 	/* XXX assumes dnode_hold will not get an i/o error */
1732 	(void) dnode_hold(os->os, object, FTAG, &dn);
1733 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1734 	dn->dn_compress = compress;
1735 	dnode_setdirty(dn, tx);
1736 	dnode_rele(dn, FTAG);
1737 }
1738 
1739 /*
1740  * XXX - eventually, this should take into account per-dataset (or
1741  *       even per-object?) user requests for higher levels of replication.
1742  */
1743 int
1744 dmu_get_replication_level(spa_t *spa, zbookmark_t *zb, dmu_object_type_t ot)
1745 {
1746 	int ncopies = 1;
1747 
1748 	if (dmu_ot[ot].ot_metadata)
1749 		ncopies++;
1750 	if (zb->zb_level != 0)
1751 		ncopies++;
1752 	if (zb->zb_objset == 0 && zb->zb_object == 0)
1753 		ncopies++;
1754 	return (MIN(ncopies, spa_max_replication(spa)));
1755 }
1756 
1757 int
1758 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1759 {
1760 	dnode_t *dn;
1761 	int i, err;
1762 
1763 	err = dnode_hold(os->os, object, FTAG, &dn);
1764 	if (err)
1765 		return (err);
1766 	/*
1767 	 * Sync any current changes before
1768 	 * we go trundling through the block pointers.
1769 	 */
1770 	for (i = 0; i < TXG_SIZE; i++) {
1771 		if (list_link_active(&dn->dn_dirty_link[i]))
1772 			break;
1773 	}
1774 	if (i != TXG_SIZE) {
1775 		dnode_rele(dn, FTAG);
1776 		txg_wait_synced(dmu_objset_pool(os), 0);
1777 		err = dnode_hold(os->os, object, FTAG, &dn);
1778 		if (err)
1779 			return (err);
1780 	}
1781 
1782 	err = dnode_next_offset(dn, hole, off, 1, 1);
1783 	dnode_rele(dn, FTAG);
1784 
1785 	return (err);
1786 }
1787 
1788 void
1789 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1790 {
1791 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1792 	mutex_enter(&dn->dn_mtx);
1793 
1794 	doi->doi_data_block_size = dn->dn_datablksz;
1795 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1796 	    1ULL << dn->dn_indblkshift : 0;
1797 	doi->doi_indirection = dn->dn_nlevels;
1798 	doi->doi_checksum = dn->dn_checksum;
1799 	doi->doi_compress = dn->dn_compress;
1800 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
1801 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
1802 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
1803 	doi->doi_type = dn->dn_type;
1804 	doi->doi_bonus_size = dn->dn_bonuslen;
1805 	doi->doi_bonus_type = dn->dn_bonustype;
1806 
1807 	mutex_exit(&dn->dn_mtx);
1808 	rw_exit(&dn->dn_struct_rwlock);
1809 }
1810 
1811 /*
1812  * Get information on a DMU object.
1813  * If doi is NULL, just indicates whether the object exists.
1814  */
1815 int
1816 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1817 {
1818 	dnode_t *dn;
1819 	int err = dnode_hold(os->os, object, FTAG, &dn);
1820 
1821 	if (err)
1822 		return (err);
1823 
1824 	if (doi != NULL)
1825 		dmu_object_info_from_dnode(dn, doi);
1826 
1827 	dnode_rele(dn, FTAG);
1828 	return (0);
1829 }
1830 
1831 /*
1832  * As above, but faster; can be used when you have a held dbuf in hand.
1833  */
1834 void
1835 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1836 {
1837 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1838 }
1839 
1840 /*
1841  * Faster still when you only care about the size.
1842  * This is specifically optimized for zfs_getattr().
1843  */
1844 void
1845 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1846 {
1847 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1848 
1849 	*blksize = dn->dn_datablksz;
1850 	/* add 1 for dnode space */
1851 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
1852 	    SPA_MINBLOCKSHIFT) + 1;
1853 }
1854 
1855 /*
1856  * Given a bookmark, return the name of the dataset, object, and range in
1857  * human-readable format.
1858  */
1859 int
1860 spa_bookmark_name(spa_t *spa, zbookmark_t *zb, nvlist_t *nvl)
1861 {
1862 	dsl_pool_t *dp;
1863 	dsl_dataset_t *ds = NULL;
1864 	objset_t *os = NULL;
1865 	dnode_t *dn = NULL;
1866 	int err, shift;
1867 	char dsname[MAXNAMELEN];
1868 	char objname[32];
1869 	char range[64];
1870 
1871 	dp = spa_get_dsl(spa);
1872 	if (zb->zb_objset != 0) {
1873 		rw_enter(&dp->dp_config_rwlock, RW_READER);
1874 		err = dsl_dataset_open_obj(dp, zb->zb_objset,
1875 		    NULL, DS_MODE_NONE, FTAG, &ds);
1876 		if (err) {
1877 			rw_exit(&dp->dp_config_rwlock);
1878 			return (err);
1879 		}
1880 		dsl_dataset_name(ds, dsname);
1881 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1882 		rw_exit(&dp->dp_config_rwlock);
1883 
1884 		err = dmu_objset_open(dsname, DMU_OST_ANY, DS_MODE_NONE, &os);
1885 		if (err)
1886 			goto out;
1887 
1888 	} else {
1889 		dsl_dataset_name(NULL, dsname);
1890 		os = dp->dp_meta_objset;
1891 	}
1892 
1893 
1894 	if (zb->zb_object == DMU_META_DNODE_OBJECT) {
1895 		(void) strncpy(objname, "mdn", sizeof (objname));
1896 	} else {
1897 		(void) snprintf(objname, sizeof (objname), "%lld",
1898 		    (longlong_t)zb->zb_object);
1899 	}
1900 
1901 	err = dnode_hold(os->os, zb->zb_object, FTAG, &dn);
1902 	if (err)
1903 		goto out;
1904 
1905 	shift = (dn->dn_datablkshift?dn->dn_datablkshift:SPA_MAXBLOCKSHIFT) +
1906 	    zb->zb_level * (dn->dn_indblkshift - SPA_BLKPTRSHIFT);
1907 	(void) snprintf(range, sizeof (range), "%llu-%llu",
1908 	    (u_longlong_t)(zb->zb_blkid << shift),
1909 	    (u_longlong_t)((zb->zb_blkid+1) << shift));
1910 
1911 	if ((err = nvlist_add_string(nvl, ZPOOL_ERR_DATASET, dsname)) != 0 ||
1912 	    (err = nvlist_add_string(nvl, ZPOOL_ERR_OBJECT, objname)) != 0 ||
1913 	    (err = nvlist_add_string(nvl, ZPOOL_ERR_RANGE, range)) != 0)
1914 		goto out;
1915 
1916 out:
1917 	if (dn)
1918 		dnode_rele(dn, FTAG);
1919 	if (os && os != dp->dp_meta_objset)
1920 		dmu_objset_close(os);
1921 	return (err);
1922 }
1923 
1924 void
1925 byteswap_uint64_array(void *vbuf, size_t size)
1926 {
1927 	uint64_t *buf = vbuf;
1928 	size_t count = size >> 3;
1929 	int i;
1930 
1931 	ASSERT((size & 7) == 0);
1932 
1933 	for (i = 0; i < count; i++)
1934 		buf[i] = BSWAP_64(buf[i]);
1935 }
1936 
1937 void
1938 byteswap_uint32_array(void *vbuf, size_t size)
1939 {
1940 	uint32_t *buf = vbuf;
1941 	size_t count = size >> 2;
1942 	int i;
1943 
1944 	ASSERT((size & 3) == 0);
1945 
1946 	for (i = 0; i < count; i++)
1947 		buf[i] = BSWAP_32(buf[i]);
1948 }
1949 
1950 void
1951 byteswap_uint16_array(void *vbuf, size_t size)
1952 {
1953 	uint16_t *buf = vbuf;
1954 	size_t count = size >> 1;
1955 	int i;
1956 
1957 	ASSERT((size & 1) == 0);
1958 
1959 	for (i = 0; i < count; i++)
1960 		buf[i] = BSWAP_16(buf[i]);
1961 }
1962 
1963 /* ARGSUSED */
1964 void
1965 byteswap_uint8_array(void *vbuf, size_t size)
1966 {
1967 }
1968 
1969 void
1970 dmu_init(void)
1971 {
1972 	dbuf_init();
1973 	dnode_init();
1974 	arc_init();
1975 }
1976 
1977 void
1978 dmu_fini(void)
1979 {
1980 	arc_fini();
1981 	dnode_fini();
1982 	dbuf_fini();
1983 }
1984