xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu.c (revision a2eea2e101e6a163a537dcc6d4e3c4da2a0ea5b2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dmu_zfetch.h>
42 #include <sys/zfs_ioctl.h>
43 #include <sys/zap.h>
44 #include <sys/zio_checksum.h>
45 #ifdef _KERNEL
46 #include <sys/vmsystm.h>
47 #endif
48 
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
51 	{	zap_byteswap,		TRUE,	"object directory"	},
52 	{	byteswap_uint64_array,	TRUE,	"object array"		},
53 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
54 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
55 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
56 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
58 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
59 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
60 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
61 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
62 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
63 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
64 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
65 	{	zap_byteswap,		TRUE,	"DSL props"		},
66 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
67 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
68 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
69 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
70 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
71 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
72 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
73 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
74 	{	zap_byteswap,		TRUE,	"zvol prop"		},
75 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
76 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
77 	{	zap_byteswap,		TRUE,	"other ZAP"		},
78 	{	zap_byteswap,		TRUE,	"persistent error log"	},
79 };
80 
81 int
82 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
83     void *tag, dmu_buf_t **dbp)
84 {
85 	dnode_t *dn;
86 	uint64_t blkid;
87 	dmu_buf_impl_t *db;
88 	int err;
89 
90 	err = dnode_hold(os->os, object, FTAG, &dn);
91 	if (err)
92 		return (err);
93 	blkid = dbuf_whichblock(dn, offset);
94 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
95 	db = dbuf_hold(dn, blkid, tag);
96 	rw_exit(&dn->dn_struct_rwlock);
97 	if (db == NULL) {
98 		err = EIO;
99 	} else {
100 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
101 		if (err) {
102 			dbuf_rele(db, tag);
103 			db = NULL;
104 		}
105 	}
106 
107 	dnode_rele(dn, FTAG);
108 	*dbp = &db->db;
109 	return (err);
110 }
111 
112 int
113 dmu_bonus_max(void)
114 {
115 	return (DN_MAX_BONUSLEN);
116 }
117 
118 /*
119  * returns ENOENT, EIO, or 0.
120  */
121 int
122 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
123 {
124 	dnode_t *dn;
125 	int err, count;
126 	dmu_buf_impl_t *db;
127 
128 	err = dnode_hold(os->os, object, FTAG, &dn);
129 	if (err)
130 		return (err);
131 
132 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
133 	if (dn->dn_bonus == NULL) {
134 		rw_exit(&dn->dn_struct_rwlock);
135 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
136 		if (dn->dn_bonus == NULL)
137 			dn->dn_bonus = dbuf_create_bonus(dn);
138 	}
139 	db = dn->dn_bonus;
140 	rw_exit(&dn->dn_struct_rwlock);
141 	mutex_enter(&db->db_mtx);
142 	count = refcount_add(&db->db_holds, tag);
143 	mutex_exit(&db->db_mtx);
144 	if (count == 1)
145 		dnode_add_ref(dn, db);
146 	dnode_rele(dn, FTAG);
147 
148 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
149 
150 	*dbp = &db->db;
151 	return (0);
152 }
153 
154 /*
155  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
156  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
157  * and can induce severe lock contention when writing to several files
158  * whose dnodes are in the same block.
159  */
160 static int
161 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
162     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
163 {
164 	dmu_buf_t **dbp;
165 	uint64_t blkid, nblks, i;
166 	uint32_t flags;
167 	int err;
168 	zio_t *zio;
169 
170 	ASSERT(length <= DMU_MAX_ACCESS);
171 
172 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
173 	if (length > zfetch_array_rd_sz)
174 		flags |= DB_RF_NOPREFETCH;
175 
176 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
177 	if (dn->dn_datablkshift) {
178 		int blkshift = dn->dn_datablkshift;
179 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
180 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
181 	} else {
182 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
183 		nblks = 1;
184 	}
185 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
186 
187 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
188 	blkid = dbuf_whichblock(dn, offset);
189 	for (i = 0; i < nblks; i++) {
190 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
191 		if (db == NULL) {
192 			rw_exit(&dn->dn_struct_rwlock);
193 			dmu_buf_rele_array(dbp, nblks, tag);
194 			zio_nowait(zio);
195 			return (EIO);
196 		}
197 		/* initiate async i/o */
198 		if (read) {
199 			rw_exit(&dn->dn_struct_rwlock);
200 			(void) dbuf_read(db, zio, flags);
201 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
202 		}
203 		dbp[i] = &db->db;
204 	}
205 	rw_exit(&dn->dn_struct_rwlock);
206 
207 	/* wait for async i/o */
208 	err = zio_wait(zio);
209 	if (err) {
210 		dmu_buf_rele_array(dbp, nblks, tag);
211 		return (err);
212 	}
213 
214 	/* wait for other io to complete */
215 	if (read) {
216 		for (i = 0; i < nblks; i++) {
217 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
218 			mutex_enter(&db->db_mtx);
219 			while (db->db_state == DB_READ ||
220 			    db->db_state == DB_FILL)
221 				cv_wait(&db->db_changed, &db->db_mtx);
222 			if (db->db_state == DB_UNCACHED)
223 				err = EIO;
224 			mutex_exit(&db->db_mtx);
225 			if (err) {
226 				dmu_buf_rele_array(dbp, nblks, tag);
227 				return (err);
228 			}
229 		}
230 	}
231 
232 	*numbufsp = nblks;
233 	*dbpp = dbp;
234 	return (0);
235 }
236 
237 static int
238 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
239     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
240 {
241 	dnode_t *dn;
242 	int err;
243 
244 	err = dnode_hold(os->os, object, FTAG, &dn);
245 	if (err)
246 		return (err);
247 
248 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
249 	    numbufsp, dbpp);
250 
251 	dnode_rele(dn, FTAG);
252 
253 	return (err);
254 }
255 
256 int
257 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
258     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
259 {
260 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
261 	int err;
262 
263 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
264 	    numbufsp, dbpp);
265 
266 	return (err);
267 }
268 
269 void
270 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
271 {
272 	int i;
273 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
274 
275 	if (numbufs == 0)
276 		return;
277 
278 	for (i = 0; i < numbufs; i++) {
279 		if (dbp[i])
280 			dbuf_rele(dbp[i], tag);
281 	}
282 
283 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
284 }
285 
286 void
287 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
288 {
289 	dnode_t *dn;
290 	uint64_t blkid;
291 	int nblks, i, err;
292 
293 	if (len == 0) {  /* they're interested in the bonus buffer */
294 		dn = os->os->os_meta_dnode;
295 
296 		if (object == 0 || object >= DN_MAX_OBJECT)
297 			return;
298 
299 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
300 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
301 		dbuf_prefetch(dn, blkid);
302 		rw_exit(&dn->dn_struct_rwlock);
303 		return;
304 	}
305 
306 	/*
307 	 * XXX - Note, if the dnode for the requested object is not
308 	 * already cached, we will do a *synchronous* read in the
309 	 * dnode_hold() call.  The same is true for any indirects.
310 	 */
311 	err = dnode_hold(os->os, object, FTAG, &dn);
312 	if (err != 0)
313 		return;
314 
315 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
316 	if (dn->dn_datablkshift) {
317 		int blkshift = dn->dn_datablkshift;
318 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
319 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
320 	} else {
321 		nblks = (offset < dn->dn_datablksz);
322 	}
323 
324 	if (nblks != 0) {
325 		blkid = dbuf_whichblock(dn, offset);
326 		for (i = 0; i < nblks; i++)
327 			dbuf_prefetch(dn, blkid+i);
328 	}
329 
330 	rw_exit(&dn->dn_struct_rwlock);
331 
332 	dnode_rele(dn, FTAG);
333 }
334 
335 int
336 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
337     uint64_t size, dmu_tx_t *tx)
338 {
339 	dnode_t *dn;
340 	int err = dnode_hold(os->os, object, FTAG, &dn);
341 	if (err)
342 		return (err);
343 	ASSERT(offset < UINT64_MAX);
344 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
345 	dnode_free_range(dn, offset, size, tx);
346 	dnode_rele(dn, FTAG);
347 	return (0);
348 }
349 
350 int
351 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
352     void *buf)
353 {
354 	dnode_t *dn;
355 	dmu_buf_t **dbp;
356 	int numbufs, i, err;
357 
358 	/*
359 	 * Deal with odd block sizes, where there can't be data past the
360 	 * first block.
361 	 */
362 	err = dnode_hold(os->os, object, FTAG, &dn);
363 	if (err)
364 		return (err);
365 	if (dn->dn_datablkshift == 0) {
366 		int newsz = offset > dn->dn_datablksz ? 0 :
367 		    MIN(size, dn->dn_datablksz - offset);
368 		bzero((char *)buf + newsz, size - newsz);
369 		size = newsz;
370 	}
371 
372 	while (size > 0) {
373 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
374 		int err;
375 
376 		/*
377 		 * NB: we could do this block-at-a-time, but it's nice
378 		 * to be reading in parallel.
379 		 */
380 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
381 		    TRUE, FTAG, &numbufs, &dbp);
382 		if (err)
383 			return (err);
384 
385 		for (i = 0; i < numbufs; i++) {
386 			int tocpy;
387 			int bufoff;
388 			dmu_buf_t *db = dbp[i];
389 
390 			ASSERT(size > 0);
391 
392 			bufoff = offset - db->db_offset;
393 			tocpy = (int)MIN(db->db_size - bufoff, size);
394 
395 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
396 
397 			offset += tocpy;
398 			size -= tocpy;
399 			buf = (char *)buf + tocpy;
400 		}
401 		dmu_buf_rele_array(dbp, numbufs, FTAG);
402 	}
403 	dnode_rele(dn, FTAG);
404 	return (0);
405 }
406 
407 void
408 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
409     const void *buf, dmu_tx_t *tx)
410 {
411 	dmu_buf_t **dbp;
412 	int numbufs, i;
413 
414 	if (size == 0)
415 		return;
416 
417 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
418 	    FALSE, FTAG, &numbufs, &dbp));
419 
420 	for (i = 0; i < numbufs; i++) {
421 		int tocpy;
422 		int bufoff;
423 		dmu_buf_t *db = dbp[i];
424 
425 		ASSERT(size > 0);
426 
427 		bufoff = offset - db->db_offset;
428 		tocpy = (int)MIN(db->db_size - bufoff, size);
429 
430 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
431 
432 		if (tocpy == db->db_size)
433 			dmu_buf_will_fill(db, tx);
434 		else
435 			dmu_buf_will_dirty(db, tx);
436 
437 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
438 
439 		if (tocpy == db->db_size)
440 			dmu_buf_fill_done(db, tx);
441 
442 		offset += tocpy;
443 		size -= tocpy;
444 		buf = (char *)buf + tocpy;
445 	}
446 	dmu_buf_rele_array(dbp, numbufs, FTAG);
447 }
448 
449 #ifdef _KERNEL
450 int
451 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
452     uio_t *uio, dmu_tx_t *tx)
453 {
454 	dmu_buf_t **dbp;
455 	int numbufs, i;
456 	int err = 0;
457 
458 	if (size == 0)
459 		return (0);
460 
461 	err = dmu_buf_hold_array(os, object, offset, size,
462 	    FALSE, FTAG, &numbufs, &dbp);
463 	if (err)
464 		return (err);
465 
466 	for (i = 0; i < numbufs; i++) {
467 		int tocpy;
468 		int bufoff;
469 		dmu_buf_t *db = dbp[i];
470 
471 		ASSERT(size > 0);
472 
473 		bufoff = offset - db->db_offset;
474 		tocpy = (int)MIN(db->db_size - bufoff, size);
475 
476 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
477 
478 		if (tocpy == db->db_size)
479 			dmu_buf_will_fill(db, tx);
480 		else
481 			dmu_buf_will_dirty(db, tx);
482 
483 		/*
484 		 * XXX uiomove could block forever (eg. nfs-backed
485 		 * pages).  There needs to be a uiolockdown() function
486 		 * to lock the pages in memory, so that uiomove won't
487 		 * block.
488 		 */
489 		err = uiomove((char *)db->db_data + bufoff, tocpy,
490 		    UIO_WRITE, uio);
491 
492 		if (tocpy == db->db_size)
493 			dmu_buf_fill_done(db, tx);
494 
495 		if (err)
496 			break;
497 
498 		offset += tocpy;
499 		size -= tocpy;
500 	}
501 	dmu_buf_rele_array(dbp, numbufs, FTAG);
502 	return (err);
503 }
504 
505 int
506 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
507     page_t *pp, dmu_tx_t *tx)
508 {
509 	dmu_buf_t **dbp;
510 	int numbufs, i;
511 	int err;
512 
513 	if (size == 0)
514 		return (0);
515 
516 	err = dmu_buf_hold_array(os, object, offset, size,
517 	    FALSE, FTAG, &numbufs, &dbp);
518 	if (err)
519 		return (err);
520 
521 	for (i = 0; i < numbufs; i++) {
522 		int tocpy, copied, thiscpy;
523 		int bufoff;
524 		dmu_buf_t *db = dbp[i];
525 		caddr_t va;
526 
527 		ASSERT(size > 0);
528 		ASSERT3U(db->db_size, >=, PAGESIZE);
529 
530 		bufoff = offset - db->db_offset;
531 		tocpy = (int)MIN(db->db_size - bufoff, size);
532 
533 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
534 
535 		if (tocpy == db->db_size)
536 			dmu_buf_will_fill(db, tx);
537 		else
538 			dmu_buf_will_dirty(db, tx);
539 
540 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
541 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
542 			thiscpy = MIN(PAGESIZE, tocpy - copied);
543 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
544 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
545 			ppmapout(va);
546 			pp = pp->p_next;
547 			bufoff += PAGESIZE;
548 		}
549 
550 		if (tocpy == db->db_size)
551 			dmu_buf_fill_done(db, tx);
552 
553 		if (err)
554 			break;
555 
556 		offset += tocpy;
557 		size -= tocpy;
558 	}
559 	dmu_buf_rele_array(dbp, numbufs, FTAG);
560 	return (err);
561 }
562 #endif
563 
564 typedef struct {
565 	uint64_t	txg;
566 	dmu_buf_impl_t	*db;
567 	dmu_sync_cb_t	*done;
568 	void		*arg;
569 } dmu_sync_cbin_t;
570 
571 typedef union {
572 	dmu_sync_cbin_t	data;
573 	blkptr_t	blk;
574 } dmu_sync_cbarg_t;
575 
576 /* ARGSUSED */
577 static void
578 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
579 {
580 	dmu_sync_cbin_t *in = (dmu_sync_cbin_t *)varg;
581 	dmu_buf_impl_t *db = in->db;
582 	uint64_t txg = in->txg;
583 	dmu_sync_cb_t *done = in->done;
584 	void *arg = in->arg;
585 	blkptr_t *blk = (blkptr_t *)varg;
586 
587 	if (!BP_IS_HOLE(zio->io_bp)) {
588 		zio->io_bp->blk_fill = 1;
589 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
590 		BP_SET_LEVEL(zio->io_bp, 0);
591 	}
592 
593 	*blk = *zio->io_bp; /* structure assignment */
594 
595 	mutex_enter(&db->db_mtx);
596 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == IN_DMU_SYNC);
597 	db->db_d.db_overridden_by[txg&TXG_MASK] = blk;
598 	cv_broadcast(&db->db_changed);
599 	mutex_exit(&db->db_mtx);
600 
601 	if (done)
602 		done(&(db->db), arg);
603 }
604 
605 /*
606  * Intent log support: sync the block associated with db to disk.
607  * N.B. and XXX: the caller is responsible for making sure that the
608  * data isn't changing while dmu_sync() is writing it.
609  *
610  * Return values:
611  *
612  *	EEXIST: this txg has already been synced, so there's nothing to to.
613  *		The caller should not log the write.
614  *
615  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
616  *		The caller should not log the write.
617  *
618  *	EALREADY: this block is already in the process of being synced.
619  *		The caller should track its progress (somehow).
620  *
621  *	EINPROGRESS: the IO has been initiated.
622  *		The caller should log this blkptr in the callback.
623  *
624  *	0: completed.  Sets *bp to the blkptr just written.
625  *		The caller should log this blkptr immediately.
626  */
627 int
628 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
629     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
630 {
631 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
632 	objset_impl_t *os = db->db_objset;
633 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
634 	tx_state_t *tx = &dp->dp_tx;
635 	dmu_sync_cbin_t *in;
636 	blkptr_t *blk;
637 	zbookmark_t zb;
638 	uint32_t arc_flag;
639 	int err;
640 
641 	ASSERT(BP_IS_HOLE(bp));
642 	ASSERT(txg != 0);
643 
644 
645 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
646 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
647 
648 	/*
649 	 * XXX - would be nice if we could do this without suspending...
650 	 */
651 	txg_suspend(dp);
652 
653 	/*
654 	 * If this txg already synced, there's nothing to do.
655 	 */
656 	if (txg <= tx->tx_synced_txg) {
657 		txg_resume(dp);
658 		/*
659 		 * If we're running ziltest, we need the blkptr regardless.
660 		 */
661 		if (txg > spa_freeze_txg(dp->dp_spa)) {
662 			/* if db_blkptr == NULL, this was an empty write */
663 			if (db->db_blkptr)
664 				*bp = *db->db_blkptr; /* structure assignment */
665 			return (0);
666 		}
667 		return (EEXIST);
668 	}
669 
670 	mutex_enter(&db->db_mtx);
671 
672 	blk = db->db_d.db_overridden_by[txg&TXG_MASK];
673 	if (blk == IN_DMU_SYNC) {
674 		/*
675 		 * We have already issued a sync write for this buffer.
676 		 */
677 		mutex_exit(&db->db_mtx);
678 		txg_resume(dp);
679 		return (EALREADY);
680 	} else if (blk != NULL) {
681 		/*
682 		 * This buffer had already been synced.  It could not
683 		 * have been dirtied since, or we would have cleared blk.
684 		 */
685 		*bp = *blk; /* structure assignment */
686 		mutex_exit(&db->db_mtx);
687 		txg_resume(dp);
688 		return (0);
689 	}
690 
691 	if (txg == tx->tx_syncing_txg) {
692 		while (db->db_data_pending) {
693 			/*
694 			 * IO is in-progress.  Wait for it to finish.
695 			 * XXX - would be nice to be able to somehow "attach"
696 			 * this zio to the parent zio passed in.
697 			 */
698 			cv_wait(&db->db_changed, &db->db_mtx);
699 			if (!db->db_data_pending &&
700 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
701 				/*
702 				 * IO was compressed away
703 				 */
704 				*bp = *db->db_blkptr; /* structure assignment */
705 				mutex_exit(&db->db_mtx);
706 				txg_resume(dp);
707 				return (0);
708 			}
709 			ASSERT(db->db_data_pending ||
710 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
711 		}
712 
713 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
714 			/*
715 			 * IO is already completed.
716 			 */
717 			*bp = *db->db_blkptr; /* structure assignment */
718 			mutex_exit(&db->db_mtx);
719 			txg_resume(dp);
720 			return (0);
721 		}
722 	}
723 
724 	if (db->db_d.db_data_old[txg&TXG_MASK] == NULL) {
725 		/*
726 		 * This dbuf isn't dirty, must have been free_range'd.
727 		 * There's no need to log writes to freed blocks, so we're done.
728 		 */
729 		mutex_exit(&db->db_mtx);
730 		txg_resume(dp);
731 		return (ENOENT);
732 	}
733 
734 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == NULL);
735 	db->db_d.db_overridden_by[txg&TXG_MASK] = IN_DMU_SYNC;
736 	/*
737 	 * XXX - a little ugly to stash the blkptr in the callback
738 	 * buffer.  We always need to make sure the following is true:
739 	 * ASSERT(sizeof(blkptr_t) >= sizeof(dmu_sync_cbin_t));
740 	 */
741 	in = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
742 	in->db = db;
743 	in->txg = txg;
744 	in->done = done;
745 	in->arg = arg;
746 	mutex_exit(&db->db_mtx);
747 	txg_resume(dp);
748 
749 	arc_flag = pio == NULL ? ARC_WAIT : ARC_NOWAIT;
750 	zb.zb_objset = os->os_dsl_dataset->ds_object;
751 	zb.zb_object = db->db.db_object;
752 	zb.zb_level = db->db_level;
753 	zb.zb_blkid = db->db_blkid;
754 	err = arc_write(pio, os->os_spa,
755 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
756 	    zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
757 	    dmu_get_replication_level(os->os_spa, &zb, db->db_dnode->dn_type),
758 	    txg, bp, db->db_d.db_data_old[txg&TXG_MASK], dmu_sync_done, in,
759 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, arc_flag, &zb);
760 	ASSERT(err == 0);
761 
762 	return (arc_flag == ARC_NOWAIT ? EINPROGRESS : 0);
763 }
764 
765 int
766 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
767 	dmu_tx_t *tx)
768 {
769 	dnode_t *dn;
770 	int err;
771 
772 	err = dnode_hold(os->os, object, FTAG, &dn);
773 	if (err)
774 		return (err);
775 	err = dnode_set_blksz(dn, size, ibs, tx);
776 	dnode_rele(dn, FTAG);
777 	return (err);
778 }
779 
780 void
781 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
782 	dmu_tx_t *tx)
783 {
784 	dnode_t *dn;
785 
786 	/* XXX assumes dnode_hold will not get an i/o error */
787 	(void) dnode_hold(os->os, object, FTAG, &dn);
788 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
789 	dn->dn_checksum = checksum;
790 	dnode_setdirty(dn, tx);
791 	dnode_rele(dn, FTAG);
792 }
793 
794 void
795 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
796 	dmu_tx_t *tx)
797 {
798 	dnode_t *dn;
799 
800 	/* XXX assumes dnode_hold will not get an i/o error */
801 	(void) dnode_hold(os->os, object, FTAG, &dn);
802 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
803 	dn->dn_compress = compress;
804 	dnode_setdirty(dn, tx);
805 	dnode_rele(dn, FTAG);
806 }
807 
808 /*
809  * XXX - eventually, this should take into account per-dataset (or
810  *       even per-object?) user requests for higher levels of replication.
811  */
812 int
813 dmu_get_replication_level(spa_t *spa, zbookmark_t *zb, dmu_object_type_t ot)
814 {
815 	int ncopies = 1;
816 
817 	if (dmu_ot[ot].ot_metadata)
818 		ncopies++;
819 	if (zb->zb_level != 0)
820 		ncopies++;
821 	if (zb->zb_objset == 0 && zb->zb_object == 0)
822 		ncopies++;
823 	return (MIN(ncopies, spa_max_replication(spa)));
824 }
825 
826 int
827 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
828 {
829 	dnode_t *dn;
830 	int i, err;
831 
832 	err = dnode_hold(os->os, object, FTAG, &dn);
833 	if (err)
834 		return (err);
835 	/*
836 	 * Sync any current changes before
837 	 * we go trundling through the block pointers.
838 	 */
839 	for (i = 0; i < TXG_SIZE; i++) {
840 		if (list_link_active(&dn->dn_dirty_link[i]))
841 			break;
842 	}
843 	if (i != TXG_SIZE) {
844 		dnode_rele(dn, FTAG);
845 		txg_wait_synced(dmu_objset_pool(os), 0);
846 		err = dnode_hold(os->os, object, FTAG, &dn);
847 		if (err)
848 			return (err);
849 	}
850 
851 	err = dnode_next_offset(dn, hole, off, 1, 1);
852 	dnode_rele(dn, FTAG);
853 
854 	return (err);
855 }
856 
857 void
858 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
859 {
860 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
861 	mutex_enter(&dn->dn_mtx);
862 
863 	doi->doi_data_block_size = dn->dn_datablksz;
864 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
865 	    1ULL << dn->dn_indblkshift : 0;
866 	doi->doi_indirection = dn->dn_nlevels;
867 	doi->doi_checksum = dn->dn_checksum;
868 	doi->doi_compress = dn->dn_compress;
869 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
870 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
871 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
872 	doi->doi_type = dn->dn_type;
873 	doi->doi_bonus_size = dn->dn_bonuslen;
874 	doi->doi_bonus_type = dn->dn_bonustype;
875 
876 	mutex_exit(&dn->dn_mtx);
877 	rw_exit(&dn->dn_struct_rwlock);
878 }
879 
880 /*
881  * Get information on a DMU object.
882  * If doi is NULL, just indicates whether the object exists.
883  */
884 int
885 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
886 {
887 	dnode_t *dn;
888 	int err = dnode_hold(os->os, object, FTAG, &dn);
889 
890 	if (err)
891 		return (err);
892 
893 	if (doi != NULL)
894 		dmu_object_info_from_dnode(dn, doi);
895 
896 	dnode_rele(dn, FTAG);
897 	return (0);
898 }
899 
900 /*
901  * As above, but faster; can be used when you have a held dbuf in hand.
902  */
903 void
904 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
905 {
906 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
907 }
908 
909 /*
910  * Faster still when you only care about the size.
911  * This is specifically optimized for zfs_getattr().
912  */
913 void
914 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
915 {
916 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
917 
918 	*blksize = dn->dn_datablksz;
919 	/* add 1 for dnode space */
920 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
921 	    SPA_MINBLOCKSHIFT) + 1;
922 }
923 
924 /*
925  * Given a bookmark, return the name of the dataset, object, and range in
926  * human-readable format.
927  */
928 int
929 spa_bookmark_name(spa_t *spa, zbookmark_t *zb, nvlist_t *nvl)
930 {
931 	dsl_pool_t *dp;
932 	dsl_dataset_t *ds = NULL;
933 	objset_t *os = NULL;
934 	dnode_t *dn = NULL;
935 	int err, shift;
936 	char dsname[MAXNAMELEN];
937 	char objname[32];
938 	char range[64];
939 
940 	dp = spa_get_dsl(spa);
941 	if (zb->zb_objset != 0) {
942 		rw_enter(&dp->dp_config_rwlock, RW_READER);
943 		err = dsl_dataset_open_obj(dp, zb->zb_objset,
944 		    NULL, DS_MODE_NONE, FTAG, &ds);
945 		if (err) {
946 			rw_exit(&dp->dp_config_rwlock);
947 			return (err);
948 		}
949 		dsl_dataset_name(ds, dsname);
950 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
951 		rw_exit(&dp->dp_config_rwlock);
952 
953 		err = dmu_objset_open(dsname, DMU_OST_ANY, DS_MODE_NONE, &os);
954 		if (err)
955 			goto out;
956 
957 	} else {
958 		dsl_dataset_name(NULL, dsname);
959 		os = dp->dp_meta_objset;
960 	}
961 
962 
963 	if (zb->zb_object == DMU_META_DNODE_OBJECT) {
964 		(void) strncpy(objname, "mdn", sizeof (objname));
965 	} else {
966 		(void) snprintf(objname, sizeof (objname), "%lld",
967 		    (longlong_t)zb->zb_object);
968 	}
969 
970 	err = dnode_hold(os->os, zb->zb_object, FTAG, &dn);
971 	if (err)
972 		goto out;
973 
974 	shift = (dn->dn_datablkshift?dn->dn_datablkshift:SPA_MAXBLOCKSHIFT) +
975 	    zb->zb_level * (dn->dn_indblkshift - SPA_BLKPTRSHIFT);
976 	(void) snprintf(range, sizeof (range), "%llu-%llu",
977 	    (u_longlong_t)(zb->zb_blkid << shift),
978 	    (u_longlong_t)((zb->zb_blkid+1) << shift));
979 
980 	if ((err = nvlist_add_string(nvl, ZPOOL_ERR_DATASET, dsname)) != 0 ||
981 	    (err = nvlist_add_string(nvl, ZPOOL_ERR_OBJECT, objname)) != 0 ||
982 	    (err = nvlist_add_string(nvl, ZPOOL_ERR_RANGE, range)) != 0)
983 		goto out;
984 
985 out:
986 	if (dn)
987 		dnode_rele(dn, FTAG);
988 	if (os && os != dp->dp_meta_objset)
989 		dmu_objset_close(os);
990 	return (err);
991 }
992 
993 void
994 byteswap_uint64_array(void *vbuf, size_t size)
995 {
996 	uint64_t *buf = vbuf;
997 	size_t count = size >> 3;
998 	int i;
999 
1000 	ASSERT((size & 7) == 0);
1001 
1002 	for (i = 0; i < count; i++)
1003 		buf[i] = BSWAP_64(buf[i]);
1004 }
1005 
1006 void
1007 byteswap_uint32_array(void *vbuf, size_t size)
1008 {
1009 	uint32_t *buf = vbuf;
1010 	size_t count = size >> 2;
1011 	int i;
1012 
1013 	ASSERT((size & 3) == 0);
1014 
1015 	for (i = 0; i < count; i++)
1016 		buf[i] = BSWAP_32(buf[i]);
1017 }
1018 
1019 void
1020 byteswap_uint16_array(void *vbuf, size_t size)
1021 {
1022 	uint16_t *buf = vbuf;
1023 	size_t count = size >> 1;
1024 	int i;
1025 
1026 	ASSERT((size & 1) == 0);
1027 
1028 	for (i = 0; i < count; i++)
1029 		buf[i] = BSWAP_16(buf[i]);
1030 }
1031 
1032 /* ARGSUSED */
1033 void
1034 byteswap_uint8_array(void *vbuf, size_t size)
1035 {
1036 }
1037 
1038 void
1039 dmu_init(void)
1040 {
1041 	dbuf_init();
1042 	dnode_init();
1043 	arc_init();
1044 }
1045 
1046 void
1047 dmu_fini(void)
1048 {
1049 	arc_fini();
1050 	dnode_fini();
1051 	dbuf_fini();
1052 }
1053