xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu.c (revision 06eeb2ad640ce72d394ac521094bed7681044408)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dmu_zfetch.h>
42 #include <sys/zfs_ioctl.h>
43 #include <sys/zap.h>
44 #include <sys/zio_checksum.h>
45 #ifdef _KERNEL
46 #include <sys/vmsystm.h>
47 #endif
48 
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
51 	{	zap_byteswap,		TRUE,	"object directory"	},
52 	{	byteswap_uint64_array,	TRUE,	"object array"		},
53 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
54 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
55 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
56 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
58 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
59 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
60 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
61 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
62 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
63 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
64 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
65 	{	zap_byteswap,		TRUE,	"DSL props"		},
66 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
67 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
68 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
69 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
70 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
71 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
72 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
73 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
74 	{	zap_byteswap,		TRUE,	"zvol prop"		},
75 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
76 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
77 	{	zap_byteswap,		TRUE,	"other ZAP"		},
78 	{	zap_byteswap,		TRUE,	"persistent error log"	},
79 	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
80 	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
81 };
82 
83 int
84 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
85     void *tag, dmu_buf_t **dbp)
86 {
87 	dnode_t *dn;
88 	uint64_t blkid;
89 	dmu_buf_impl_t *db;
90 	int err;
91 
92 	err = dnode_hold(os->os, object, FTAG, &dn);
93 	if (err)
94 		return (err);
95 	blkid = dbuf_whichblock(dn, offset);
96 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
97 	db = dbuf_hold(dn, blkid, tag);
98 	rw_exit(&dn->dn_struct_rwlock);
99 	if (db == NULL) {
100 		err = EIO;
101 	} else {
102 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
103 		if (err) {
104 			dbuf_rele(db, tag);
105 			db = NULL;
106 		}
107 	}
108 
109 	dnode_rele(dn, FTAG);
110 	*dbp = &db->db;
111 	return (err);
112 }
113 
114 int
115 dmu_bonus_max(void)
116 {
117 	return (DN_MAX_BONUSLEN);
118 }
119 
120 /*
121  * returns ENOENT, EIO, or 0.
122  */
123 int
124 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
125 {
126 	dnode_t *dn;
127 	int err, count;
128 	dmu_buf_impl_t *db;
129 
130 	err = dnode_hold(os->os, object, FTAG, &dn);
131 	if (err)
132 		return (err);
133 
134 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
135 	if (dn->dn_bonus == NULL) {
136 		rw_exit(&dn->dn_struct_rwlock);
137 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
138 		if (dn->dn_bonus == NULL)
139 			dn->dn_bonus = dbuf_create_bonus(dn);
140 	}
141 	db = dn->dn_bonus;
142 	rw_exit(&dn->dn_struct_rwlock);
143 	mutex_enter(&db->db_mtx);
144 	count = refcount_add(&db->db_holds, tag);
145 	mutex_exit(&db->db_mtx);
146 	if (count == 1)
147 		dnode_add_ref(dn, db);
148 	dnode_rele(dn, FTAG);
149 
150 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
151 
152 	*dbp = &db->db;
153 	return (0);
154 }
155 
156 /*
157  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
158  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
159  * and can induce severe lock contention when writing to several files
160  * whose dnodes are in the same block.
161  */
162 static int
163 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
164     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
165 {
166 	dmu_buf_t **dbp;
167 	uint64_t blkid, nblks, i;
168 	uint32_t flags;
169 	int err;
170 	zio_t *zio;
171 
172 	ASSERT(length <= DMU_MAX_ACCESS);
173 
174 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
175 	if (length > zfetch_array_rd_sz)
176 		flags |= DB_RF_NOPREFETCH;
177 
178 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
179 	if (dn->dn_datablkshift) {
180 		int blkshift = dn->dn_datablkshift;
181 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
182 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
183 	} else {
184 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
185 		nblks = 1;
186 	}
187 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
188 
189 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
190 	blkid = dbuf_whichblock(dn, offset);
191 	for (i = 0; i < nblks; i++) {
192 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
193 		if (db == NULL) {
194 			rw_exit(&dn->dn_struct_rwlock);
195 			dmu_buf_rele_array(dbp, nblks, tag);
196 			zio_nowait(zio);
197 			return (EIO);
198 		}
199 		/* initiate async i/o */
200 		if (read) {
201 			rw_exit(&dn->dn_struct_rwlock);
202 			(void) dbuf_read(db, zio, flags);
203 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
204 		}
205 		dbp[i] = &db->db;
206 	}
207 	rw_exit(&dn->dn_struct_rwlock);
208 
209 	/* wait for async i/o */
210 	err = zio_wait(zio);
211 	if (err) {
212 		dmu_buf_rele_array(dbp, nblks, tag);
213 		return (err);
214 	}
215 
216 	/* wait for other io to complete */
217 	if (read) {
218 		for (i = 0; i < nblks; i++) {
219 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
220 			mutex_enter(&db->db_mtx);
221 			while (db->db_state == DB_READ ||
222 			    db->db_state == DB_FILL)
223 				cv_wait(&db->db_changed, &db->db_mtx);
224 			if (db->db_state == DB_UNCACHED)
225 				err = EIO;
226 			mutex_exit(&db->db_mtx);
227 			if (err) {
228 				dmu_buf_rele_array(dbp, nblks, tag);
229 				return (err);
230 			}
231 		}
232 	}
233 
234 	*numbufsp = nblks;
235 	*dbpp = dbp;
236 	return (0);
237 }
238 
239 static int
240 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
241     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
242 {
243 	dnode_t *dn;
244 	int err;
245 
246 	err = dnode_hold(os->os, object, FTAG, &dn);
247 	if (err)
248 		return (err);
249 
250 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
251 	    numbufsp, dbpp);
252 
253 	dnode_rele(dn, FTAG);
254 
255 	return (err);
256 }
257 
258 int
259 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
260     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
261 {
262 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
263 	int err;
264 
265 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
266 	    numbufsp, dbpp);
267 
268 	return (err);
269 }
270 
271 void
272 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
273 {
274 	int i;
275 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
276 
277 	if (numbufs == 0)
278 		return;
279 
280 	for (i = 0; i < numbufs; i++) {
281 		if (dbp[i])
282 			dbuf_rele(dbp[i], tag);
283 	}
284 
285 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
286 }
287 
288 void
289 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
290 {
291 	dnode_t *dn;
292 	uint64_t blkid;
293 	int nblks, i, err;
294 
295 	if (len == 0) {  /* they're interested in the bonus buffer */
296 		dn = os->os->os_meta_dnode;
297 
298 		if (object == 0 || object >= DN_MAX_OBJECT)
299 			return;
300 
301 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
302 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
303 		dbuf_prefetch(dn, blkid);
304 		rw_exit(&dn->dn_struct_rwlock);
305 		return;
306 	}
307 
308 	/*
309 	 * XXX - Note, if the dnode for the requested object is not
310 	 * already cached, we will do a *synchronous* read in the
311 	 * dnode_hold() call.  The same is true for any indirects.
312 	 */
313 	err = dnode_hold(os->os, object, FTAG, &dn);
314 	if (err != 0)
315 		return;
316 
317 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
318 	if (dn->dn_datablkshift) {
319 		int blkshift = dn->dn_datablkshift;
320 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
321 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
322 	} else {
323 		nblks = (offset < dn->dn_datablksz);
324 	}
325 
326 	if (nblks != 0) {
327 		blkid = dbuf_whichblock(dn, offset);
328 		for (i = 0; i < nblks; i++)
329 			dbuf_prefetch(dn, blkid+i);
330 	}
331 
332 	rw_exit(&dn->dn_struct_rwlock);
333 
334 	dnode_rele(dn, FTAG);
335 }
336 
337 int
338 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
339     uint64_t size, dmu_tx_t *tx)
340 {
341 	dnode_t *dn;
342 	int err = dnode_hold(os->os, object, FTAG, &dn);
343 	if (err)
344 		return (err);
345 	ASSERT(offset < UINT64_MAX);
346 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
347 	dnode_free_range(dn, offset, size, tx);
348 	dnode_rele(dn, FTAG);
349 	return (0);
350 }
351 
352 int
353 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
354     void *buf)
355 {
356 	dnode_t *dn;
357 	dmu_buf_t **dbp;
358 	int numbufs, i, err;
359 
360 	/*
361 	 * Deal with odd block sizes, where there can't be data past the
362 	 * first block.
363 	 */
364 	err = dnode_hold(os->os, object, FTAG, &dn);
365 	if (err)
366 		return (err);
367 	if (dn->dn_datablkshift == 0) {
368 		int newsz = offset > dn->dn_datablksz ? 0 :
369 		    MIN(size, dn->dn_datablksz - offset);
370 		bzero((char *)buf + newsz, size - newsz);
371 		size = newsz;
372 	}
373 
374 	while (size > 0) {
375 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
376 		int err;
377 
378 		/*
379 		 * NB: we could do this block-at-a-time, but it's nice
380 		 * to be reading in parallel.
381 		 */
382 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
383 		    TRUE, FTAG, &numbufs, &dbp);
384 		if (err)
385 			return (err);
386 
387 		for (i = 0; i < numbufs; i++) {
388 			int tocpy;
389 			int bufoff;
390 			dmu_buf_t *db = dbp[i];
391 
392 			ASSERT(size > 0);
393 
394 			bufoff = offset - db->db_offset;
395 			tocpy = (int)MIN(db->db_size - bufoff, size);
396 
397 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
398 
399 			offset += tocpy;
400 			size -= tocpy;
401 			buf = (char *)buf + tocpy;
402 		}
403 		dmu_buf_rele_array(dbp, numbufs, FTAG);
404 	}
405 	dnode_rele(dn, FTAG);
406 	return (0);
407 }
408 
409 void
410 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
411     const void *buf, dmu_tx_t *tx)
412 {
413 	dmu_buf_t **dbp;
414 	int numbufs, i;
415 
416 	if (size == 0)
417 		return;
418 
419 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
420 	    FALSE, FTAG, &numbufs, &dbp));
421 
422 	for (i = 0; i < numbufs; i++) {
423 		int tocpy;
424 		int bufoff;
425 		dmu_buf_t *db = dbp[i];
426 
427 		ASSERT(size > 0);
428 
429 		bufoff = offset - db->db_offset;
430 		tocpy = (int)MIN(db->db_size - bufoff, size);
431 
432 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
433 
434 		if (tocpy == db->db_size)
435 			dmu_buf_will_fill(db, tx);
436 		else
437 			dmu_buf_will_dirty(db, tx);
438 
439 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
440 
441 		if (tocpy == db->db_size)
442 			dmu_buf_fill_done(db, tx);
443 
444 		offset += tocpy;
445 		size -= tocpy;
446 		buf = (char *)buf + tocpy;
447 	}
448 	dmu_buf_rele_array(dbp, numbufs, FTAG);
449 }
450 
451 #ifdef _KERNEL
452 int
453 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
454     uio_t *uio, dmu_tx_t *tx)
455 {
456 	dmu_buf_t **dbp;
457 	int numbufs, i;
458 	int err = 0;
459 
460 	if (size == 0)
461 		return (0);
462 
463 	err = dmu_buf_hold_array(os, object, offset, size,
464 	    FALSE, FTAG, &numbufs, &dbp);
465 	if (err)
466 		return (err);
467 
468 	for (i = 0; i < numbufs; i++) {
469 		int tocpy;
470 		int bufoff;
471 		dmu_buf_t *db = dbp[i];
472 
473 		ASSERT(size > 0);
474 
475 		bufoff = offset - db->db_offset;
476 		tocpy = (int)MIN(db->db_size - bufoff, size);
477 
478 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
479 
480 		if (tocpy == db->db_size)
481 			dmu_buf_will_fill(db, tx);
482 		else
483 			dmu_buf_will_dirty(db, tx);
484 
485 		/*
486 		 * XXX uiomove could block forever (eg. nfs-backed
487 		 * pages).  There needs to be a uiolockdown() function
488 		 * to lock the pages in memory, so that uiomove won't
489 		 * block.
490 		 */
491 		err = uiomove((char *)db->db_data + bufoff, tocpy,
492 		    UIO_WRITE, uio);
493 
494 		if (tocpy == db->db_size)
495 			dmu_buf_fill_done(db, tx);
496 
497 		if (err)
498 			break;
499 
500 		offset += tocpy;
501 		size -= tocpy;
502 	}
503 	dmu_buf_rele_array(dbp, numbufs, FTAG);
504 	return (err);
505 }
506 
507 int
508 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
509     page_t *pp, dmu_tx_t *tx)
510 {
511 	dmu_buf_t **dbp;
512 	int numbufs, i;
513 	int err;
514 
515 	if (size == 0)
516 		return (0);
517 
518 	err = dmu_buf_hold_array(os, object, offset, size,
519 	    FALSE, FTAG, &numbufs, &dbp);
520 	if (err)
521 		return (err);
522 
523 	for (i = 0; i < numbufs; i++) {
524 		int tocpy, copied, thiscpy;
525 		int bufoff;
526 		dmu_buf_t *db = dbp[i];
527 		caddr_t va;
528 
529 		ASSERT(size > 0);
530 		ASSERT3U(db->db_size, >=, PAGESIZE);
531 
532 		bufoff = offset - db->db_offset;
533 		tocpy = (int)MIN(db->db_size - bufoff, size);
534 
535 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
536 
537 		if (tocpy == db->db_size)
538 			dmu_buf_will_fill(db, tx);
539 		else
540 			dmu_buf_will_dirty(db, tx);
541 
542 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
543 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
544 			thiscpy = MIN(PAGESIZE, tocpy - copied);
545 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
546 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
547 			ppmapout(va);
548 			pp = pp->p_next;
549 			bufoff += PAGESIZE;
550 		}
551 
552 		if (tocpy == db->db_size)
553 			dmu_buf_fill_done(db, tx);
554 
555 		if (err)
556 			break;
557 
558 		offset += tocpy;
559 		size -= tocpy;
560 	}
561 	dmu_buf_rele_array(dbp, numbufs, FTAG);
562 	return (err);
563 }
564 #endif
565 
566 typedef struct {
567 	uint64_t	txg;
568 	dmu_buf_impl_t	*db;
569 	dmu_sync_cb_t	*done;
570 	void		*arg;
571 } dmu_sync_cbin_t;
572 
573 typedef union {
574 	dmu_sync_cbin_t	data;
575 	blkptr_t	blk;
576 } dmu_sync_cbarg_t;
577 
578 /* ARGSUSED */
579 static void
580 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
581 {
582 	dmu_sync_cbin_t *in = (dmu_sync_cbin_t *)varg;
583 	dmu_buf_impl_t *db = in->db;
584 	uint64_t txg = in->txg;
585 	dmu_sync_cb_t *done = in->done;
586 	void *arg = in->arg;
587 	blkptr_t *blk = (blkptr_t *)varg;
588 
589 	if (!BP_IS_HOLE(zio->io_bp)) {
590 		zio->io_bp->blk_fill = 1;
591 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
592 		BP_SET_LEVEL(zio->io_bp, 0);
593 	}
594 
595 	*blk = *zio->io_bp; /* structure assignment */
596 
597 	mutex_enter(&db->db_mtx);
598 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == IN_DMU_SYNC);
599 	db->db_d.db_overridden_by[txg&TXG_MASK] = blk;
600 	cv_broadcast(&db->db_changed);
601 	mutex_exit(&db->db_mtx);
602 
603 	if (done)
604 		done(&(db->db), arg);
605 }
606 
607 /*
608  * Intent log support: sync the block associated with db to disk.
609  * N.B. and XXX: the caller is responsible for making sure that the
610  * data isn't changing while dmu_sync() is writing it.
611  *
612  * Return values:
613  *
614  *	EEXIST: this txg has already been synced, so there's nothing to to.
615  *		The caller should not log the write.
616  *
617  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
618  *		The caller should not log the write.
619  *
620  *	EALREADY: this block is already in the process of being synced.
621  *		The caller should track its progress (somehow).
622  *
623  *	EINPROGRESS: the IO has been initiated.
624  *		The caller should log this blkptr in the callback.
625  *
626  *	0: completed.  Sets *bp to the blkptr just written.
627  *		The caller should log this blkptr immediately.
628  */
629 int
630 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
631     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
632 {
633 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
634 	objset_impl_t *os = db->db_objset;
635 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
636 	tx_state_t *tx = &dp->dp_tx;
637 	dmu_sync_cbin_t *in;
638 	blkptr_t *blk;
639 	zbookmark_t zb;
640 	uint32_t arc_flag;
641 	int err;
642 
643 	ASSERT(BP_IS_HOLE(bp));
644 	ASSERT(txg != 0);
645 
646 
647 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
648 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
649 
650 	/*
651 	 * XXX - would be nice if we could do this without suspending...
652 	 */
653 	txg_suspend(dp);
654 
655 	/*
656 	 * If this txg already synced, there's nothing to do.
657 	 */
658 	if (txg <= tx->tx_synced_txg) {
659 		txg_resume(dp);
660 		/*
661 		 * If we're running ziltest, we need the blkptr regardless.
662 		 */
663 		if (txg > spa_freeze_txg(dp->dp_spa)) {
664 			/* if db_blkptr == NULL, this was an empty write */
665 			if (db->db_blkptr)
666 				*bp = *db->db_blkptr; /* structure assignment */
667 			return (0);
668 		}
669 		return (EEXIST);
670 	}
671 
672 	mutex_enter(&db->db_mtx);
673 
674 	blk = db->db_d.db_overridden_by[txg&TXG_MASK];
675 	if (blk == IN_DMU_SYNC) {
676 		/*
677 		 * We have already issued a sync write for this buffer.
678 		 */
679 		mutex_exit(&db->db_mtx);
680 		txg_resume(dp);
681 		return (EALREADY);
682 	} else if (blk != NULL) {
683 		/*
684 		 * This buffer had already been synced.  It could not
685 		 * have been dirtied since, or we would have cleared blk.
686 		 */
687 		*bp = *blk; /* structure assignment */
688 		mutex_exit(&db->db_mtx);
689 		txg_resume(dp);
690 		return (0);
691 	}
692 
693 	if (txg == tx->tx_syncing_txg) {
694 		while (db->db_data_pending) {
695 			/*
696 			 * IO is in-progress.  Wait for it to finish.
697 			 * XXX - would be nice to be able to somehow "attach"
698 			 * this zio to the parent zio passed in.
699 			 */
700 			cv_wait(&db->db_changed, &db->db_mtx);
701 			if (!db->db_data_pending &&
702 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
703 				/*
704 				 * IO was compressed away
705 				 */
706 				*bp = *db->db_blkptr; /* structure assignment */
707 				mutex_exit(&db->db_mtx);
708 				txg_resume(dp);
709 				return (0);
710 			}
711 			ASSERT(db->db_data_pending ||
712 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
713 		}
714 
715 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
716 			/*
717 			 * IO is already completed.
718 			 */
719 			*bp = *db->db_blkptr; /* structure assignment */
720 			mutex_exit(&db->db_mtx);
721 			txg_resume(dp);
722 			return (0);
723 		}
724 	}
725 
726 	if (db->db_d.db_data_old[txg&TXG_MASK] == NULL) {
727 		/*
728 		 * This dbuf isn't dirty, must have been free_range'd.
729 		 * There's no need to log writes to freed blocks, so we're done.
730 		 */
731 		mutex_exit(&db->db_mtx);
732 		txg_resume(dp);
733 		return (ENOENT);
734 	}
735 
736 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == NULL);
737 	db->db_d.db_overridden_by[txg&TXG_MASK] = IN_DMU_SYNC;
738 	/*
739 	 * XXX - a little ugly to stash the blkptr in the callback
740 	 * buffer.  We always need to make sure the following is true:
741 	 * ASSERT(sizeof(blkptr_t) >= sizeof(dmu_sync_cbin_t));
742 	 */
743 	in = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
744 	in->db = db;
745 	in->txg = txg;
746 	in->done = done;
747 	in->arg = arg;
748 	mutex_exit(&db->db_mtx);
749 	txg_resume(dp);
750 
751 	arc_flag = pio == NULL ? ARC_WAIT : ARC_NOWAIT;
752 	zb.zb_objset = os->os_dsl_dataset->ds_object;
753 	zb.zb_object = db->db.db_object;
754 	zb.zb_level = db->db_level;
755 	zb.zb_blkid = db->db_blkid;
756 	err = arc_write(pio, os->os_spa,
757 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
758 	    zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
759 	    dmu_get_replication_level(os->os_spa, &zb, db->db_dnode->dn_type),
760 	    txg, bp, db->db_d.db_data_old[txg&TXG_MASK], dmu_sync_done, in,
761 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, arc_flag, &zb);
762 	ASSERT(err == 0);
763 
764 	return (arc_flag == ARC_NOWAIT ? EINPROGRESS : 0);
765 }
766 
767 int
768 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
769 	dmu_tx_t *tx)
770 {
771 	dnode_t *dn;
772 	int err;
773 
774 	err = dnode_hold(os->os, object, FTAG, &dn);
775 	if (err)
776 		return (err);
777 	err = dnode_set_blksz(dn, size, ibs, tx);
778 	dnode_rele(dn, FTAG);
779 	return (err);
780 }
781 
782 void
783 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
784 	dmu_tx_t *tx)
785 {
786 	dnode_t *dn;
787 
788 	/* XXX assumes dnode_hold will not get an i/o error */
789 	(void) dnode_hold(os->os, object, FTAG, &dn);
790 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
791 	dn->dn_checksum = checksum;
792 	dnode_setdirty(dn, tx);
793 	dnode_rele(dn, FTAG);
794 }
795 
796 void
797 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
798 	dmu_tx_t *tx)
799 {
800 	dnode_t *dn;
801 
802 	/* XXX assumes dnode_hold will not get an i/o error */
803 	(void) dnode_hold(os->os, object, FTAG, &dn);
804 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
805 	dn->dn_compress = compress;
806 	dnode_setdirty(dn, tx);
807 	dnode_rele(dn, FTAG);
808 }
809 
810 /*
811  * XXX - eventually, this should take into account per-dataset (or
812  *       even per-object?) user requests for higher levels of replication.
813  */
814 int
815 dmu_get_replication_level(spa_t *spa, zbookmark_t *zb, dmu_object_type_t ot)
816 {
817 	int ncopies = 1;
818 
819 	if (dmu_ot[ot].ot_metadata)
820 		ncopies++;
821 	if (zb->zb_level != 0)
822 		ncopies++;
823 	if (zb->zb_objset == 0 && zb->zb_object == 0)
824 		ncopies++;
825 	return (MIN(ncopies, spa_max_replication(spa)));
826 }
827 
828 int
829 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
830 {
831 	dnode_t *dn;
832 	int i, err;
833 
834 	err = dnode_hold(os->os, object, FTAG, &dn);
835 	if (err)
836 		return (err);
837 	/*
838 	 * Sync any current changes before
839 	 * we go trundling through the block pointers.
840 	 */
841 	for (i = 0; i < TXG_SIZE; i++) {
842 		if (list_link_active(&dn->dn_dirty_link[i]))
843 			break;
844 	}
845 	if (i != TXG_SIZE) {
846 		dnode_rele(dn, FTAG);
847 		txg_wait_synced(dmu_objset_pool(os), 0);
848 		err = dnode_hold(os->os, object, FTAG, &dn);
849 		if (err)
850 			return (err);
851 	}
852 
853 	err = dnode_next_offset(dn, hole, off, 1, 1);
854 	dnode_rele(dn, FTAG);
855 
856 	return (err);
857 }
858 
859 void
860 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
861 {
862 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
863 	mutex_enter(&dn->dn_mtx);
864 
865 	doi->doi_data_block_size = dn->dn_datablksz;
866 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
867 	    1ULL << dn->dn_indblkshift : 0;
868 	doi->doi_indirection = dn->dn_nlevels;
869 	doi->doi_checksum = dn->dn_checksum;
870 	doi->doi_compress = dn->dn_compress;
871 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
872 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
873 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
874 	doi->doi_type = dn->dn_type;
875 	doi->doi_bonus_size = dn->dn_bonuslen;
876 	doi->doi_bonus_type = dn->dn_bonustype;
877 
878 	mutex_exit(&dn->dn_mtx);
879 	rw_exit(&dn->dn_struct_rwlock);
880 }
881 
882 /*
883  * Get information on a DMU object.
884  * If doi is NULL, just indicates whether the object exists.
885  */
886 int
887 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
888 {
889 	dnode_t *dn;
890 	int err = dnode_hold(os->os, object, FTAG, &dn);
891 
892 	if (err)
893 		return (err);
894 
895 	if (doi != NULL)
896 		dmu_object_info_from_dnode(dn, doi);
897 
898 	dnode_rele(dn, FTAG);
899 	return (0);
900 }
901 
902 /*
903  * As above, but faster; can be used when you have a held dbuf in hand.
904  */
905 void
906 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
907 {
908 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
909 }
910 
911 /*
912  * Faster still when you only care about the size.
913  * This is specifically optimized for zfs_getattr().
914  */
915 void
916 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
917 {
918 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
919 
920 	*blksize = dn->dn_datablksz;
921 	/* add 1 for dnode space */
922 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
923 	    SPA_MINBLOCKSHIFT) + 1;
924 }
925 
926 /*
927  * Given a bookmark, return the name of the dataset, object, and range in
928  * human-readable format.
929  */
930 int
931 spa_bookmark_name(spa_t *spa, zbookmark_t *zb, nvlist_t *nvl)
932 {
933 	dsl_pool_t *dp;
934 	dsl_dataset_t *ds = NULL;
935 	objset_t *os = NULL;
936 	dnode_t *dn = NULL;
937 	int err, shift;
938 	char dsname[MAXNAMELEN];
939 	char objname[32];
940 	char range[64];
941 
942 	dp = spa_get_dsl(spa);
943 	if (zb->zb_objset != 0) {
944 		rw_enter(&dp->dp_config_rwlock, RW_READER);
945 		err = dsl_dataset_open_obj(dp, zb->zb_objset,
946 		    NULL, DS_MODE_NONE, FTAG, &ds);
947 		if (err) {
948 			rw_exit(&dp->dp_config_rwlock);
949 			return (err);
950 		}
951 		dsl_dataset_name(ds, dsname);
952 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
953 		rw_exit(&dp->dp_config_rwlock);
954 
955 		err = dmu_objset_open(dsname, DMU_OST_ANY, DS_MODE_NONE, &os);
956 		if (err)
957 			goto out;
958 
959 	} else {
960 		dsl_dataset_name(NULL, dsname);
961 		os = dp->dp_meta_objset;
962 	}
963 
964 
965 	if (zb->zb_object == DMU_META_DNODE_OBJECT) {
966 		(void) strncpy(objname, "mdn", sizeof (objname));
967 	} else {
968 		(void) snprintf(objname, sizeof (objname), "%lld",
969 		    (longlong_t)zb->zb_object);
970 	}
971 
972 	err = dnode_hold(os->os, zb->zb_object, FTAG, &dn);
973 	if (err)
974 		goto out;
975 
976 	shift = (dn->dn_datablkshift?dn->dn_datablkshift:SPA_MAXBLOCKSHIFT) +
977 	    zb->zb_level * (dn->dn_indblkshift - SPA_BLKPTRSHIFT);
978 	(void) snprintf(range, sizeof (range), "%llu-%llu",
979 	    (u_longlong_t)(zb->zb_blkid << shift),
980 	    (u_longlong_t)((zb->zb_blkid+1) << shift));
981 
982 	if ((err = nvlist_add_string(nvl, ZPOOL_ERR_DATASET, dsname)) != 0 ||
983 	    (err = nvlist_add_string(nvl, ZPOOL_ERR_OBJECT, objname)) != 0 ||
984 	    (err = nvlist_add_string(nvl, ZPOOL_ERR_RANGE, range)) != 0)
985 		goto out;
986 
987 out:
988 	if (dn)
989 		dnode_rele(dn, FTAG);
990 	if (os && os != dp->dp_meta_objset)
991 		dmu_objset_close(os);
992 	return (err);
993 }
994 
995 void
996 byteswap_uint64_array(void *vbuf, size_t size)
997 {
998 	uint64_t *buf = vbuf;
999 	size_t count = size >> 3;
1000 	int i;
1001 
1002 	ASSERT((size & 7) == 0);
1003 
1004 	for (i = 0; i < count; i++)
1005 		buf[i] = BSWAP_64(buf[i]);
1006 }
1007 
1008 void
1009 byteswap_uint32_array(void *vbuf, size_t size)
1010 {
1011 	uint32_t *buf = vbuf;
1012 	size_t count = size >> 2;
1013 	int i;
1014 
1015 	ASSERT((size & 3) == 0);
1016 
1017 	for (i = 0; i < count; i++)
1018 		buf[i] = BSWAP_32(buf[i]);
1019 }
1020 
1021 void
1022 byteswap_uint16_array(void *vbuf, size_t size)
1023 {
1024 	uint16_t *buf = vbuf;
1025 	size_t count = size >> 1;
1026 	int i;
1027 
1028 	ASSERT((size & 1) == 0);
1029 
1030 	for (i = 0; i < count; i++)
1031 		buf[i] = BSWAP_16(buf[i]);
1032 }
1033 
1034 /* ARGSUSED */
1035 void
1036 byteswap_uint8_array(void *vbuf, size_t size)
1037 {
1038 }
1039 
1040 void
1041 dmu_init(void)
1042 {
1043 	dbuf_init();
1044 	dnode_init();
1045 	arc_init();
1046 }
1047 
1048 void
1049 dmu_fini(void)
1050 {
1051 	arc_fini();
1052 	dnode_fini();
1053 	dbuf_fini();
1054 }
1055