xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu.c (revision 55434c770c89aa1b84474f2559a106803511aba0)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dmu_zfetch.h>
42 #include <sys/zfs_ioctl.h>
43 #include <sys/zap.h>
44 #include <sys/zio_checksum.h>
45 #ifdef _KERNEL
46 #include <sys/vmsystm.h>
47 #endif
48 
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
51 	{	zap_byteswap,		TRUE,	"object directory"	},
52 	{	byteswap_uint64_array,	TRUE,	"object array"		},
53 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
54 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
55 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
56 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
58 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
59 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
60 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
61 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
62 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
63 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
64 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
65 	{	zap_byteswap,		TRUE,	"DSL props"		},
66 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
67 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
68 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
69 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
70 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
71 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
72 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
73 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
74 	{	zap_byteswap,		TRUE,	"zvol prop"		},
75 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
76 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
77 	{	zap_byteswap,		TRUE,	"other ZAP"		},
78 	{	zap_byteswap,		TRUE,	"persistent error log"	},
79 	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
80 	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
81 };
82 
83 int
84 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
85     void *tag, dmu_buf_t **dbp)
86 {
87 	dnode_t *dn;
88 	uint64_t blkid;
89 	dmu_buf_impl_t *db;
90 	int err;
91 
92 	err = dnode_hold(os->os, object, FTAG, &dn);
93 	if (err)
94 		return (err);
95 	blkid = dbuf_whichblock(dn, offset);
96 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
97 	db = dbuf_hold(dn, blkid, tag);
98 	rw_exit(&dn->dn_struct_rwlock);
99 	if (db == NULL) {
100 		err = EIO;
101 	} else {
102 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
103 		if (err) {
104 			dbuf_rele(db, tag);
105 			db = NULL;
106 		}
107 	}
108 
109 	dnode_rele(dn, FTAG);
110 	*dbp = &db->db;
111 	return (err);
112 }
113 
114 int
115 dmu_bonus_max(void)
116 {
117 	return (DN_MAX_BONUSLEN);
118 }
119 
120 /*
121  * returns ENOENT, EIO, or 0.
122  */
123 int
124 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
125 {
126 	dnode_t *dn;
127 	int err, count;
128 	dmu_buf_impl_t *db;
129 
130 	err = dnode_hold(os->os, object, FTAG, &dn);
131 	if (err)
132 		return (err);
133 
134 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
135 	if (dn->dn_bonus == NULL) {
136 		rw_exit(&dn->dn_struct_rwlock);
137 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
138 		if (dn->dn_bonus == NULL)
139 			dn->dn_bonus = dbuf_create_bonus(dn);
140 	}
141 	db = dn->dn_bonus;
142 	rw_exit(&dn->dn_struct_rwlock);
143 	mutex_enter(&db->db_mtx);
144 	count = refcount_add(&db->db_holds, tag);
145 	mutex_exit(&db->db_mtx);
146 	if (count == 1)
147 		dnode_add_ref(dn, db);
148 	dnode_rele(dn, FTAG);
149 
150 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
151 
152 	*dbp = &db->db;
153 	return (0);
154 }
155 
156 /*
157  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
158  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
159  * and can induce severe lock contention when writing to several files
160  * whose dnodes are in the same block.
161  */
162 static int
163 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
164     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
165 {
166 	dmu_buf_t **dbp;
167 	uint64_t blkid, nblks, i;
168 	uint32_t flags;
169 	int err;
170 	zio_t *zio;
171 
172 	ASSERT(length <= DMU_MAX_ACCESS);
173 
174 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
175 	if (length > zfetch_array_rd_sz)
176 		flags |= DB_RF_NOPREFETCH;
177 
178 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
179 	if (dn->dn_datablkshift) {
180 		int blkshift = dn->dn_datablkshift;
181 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
182 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
183 	} else {
184 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
185 		nblks = 1;
186 	}
187 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
188 
189 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
190 	blkid = dbuf_whichblock(dn, offset);
191 	for (i = 0; i < nblks; i++) {
192 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
193 		if (db == NULL) {
194 			rw_exit(&dn->dn_struct_rwlock);
195 			dmu_buf_rele_array(dbp, nblks, tag);
196 			zio_nowait(zio);
197 			return (EIO);
198 		}
199 		/* initiate async i/o */
200 		if (read) {
201 			rw_exit(&dn->dn_struct_rwlock);
202 			(void) dbuf_read(db, zio, flags);
203 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
204 		}
205 		dbp[i] = &db->db;
206 	}
207 	rw_exit(&dn->dn_struct_rwlock);
208 
209 	/* wait for async i/o */
210 	err = zio_wait(zio);
211 	if (err) {
212 		dmu_buf_rele_array(dbp, nblks, tag);
213 		return (err);
214 	}
215 
216 	/* wait for other io to complete */
217 	if (read) {
218 		for (i = 0; i < nblks; i++) {
219 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
220 			mutex_enter(&db->db_mtx);
221 			while (db->db_state == DB_READ ||
222 			    db->db_state == DB_FILL)
223 				cv_wait(&db->db_changed, &db->db_mtx);
224 			if (db->db_state == DB_UNCACHED)
225 				err = EIO;
226 			mutex_exit(&db->db_mtx);
227 			if (err) {
228 				dmu_buf_rele_array(dbp, nblks, tag);
229 				return (err);
230 			}
231 		}
232 	}
233 
234 	*numbufsp = nblks;
235 	*dbpp = dbp;
236 	return (0);
237 }
238 
239 static int
240 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
241     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
242 {
243 	dnode_t *dn;
244 	int err;
245 
246 	err = dnode_hold(os->os, object, FTAG, &dn);
247 	if (err)
248 		return (err);
249 
250 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
251 	    numbufsp, dbpp);
252 
253 	dnode_rele(dn, FTAG);
254 
255 	return (err);
256 }
257 
258 int
259 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
260     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
261 {
262 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
263 	int err;
264 
265 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
266 	    numbufsp, dbpp);
267 
268 	return (err);
269 }
270 
271 void
272 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
273 {
274 	int i;
275 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
276 
277 	if (numbufs == 0)
278 		return;
279 
280 	for (i = 0; i < numbufs; i++) {
281 		if (dbp[i])
282 			dbuf_rele(dbp[i], tag);
283 	}
284 
285 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
286 }
287 
288 void
289 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
290 {
291 	dnode_t *dn;
292 	uint64_t blkid;
293 	int nblks, i, err;
294 
295 	if (zfs_prefetch_disable)
296 		return;
297 
298 	if (len == 0) {  /* they're interested in the bonus buffer */
299 		dn = os->os->os_meta_dnode;
300 
301 		if (object == 0 || object >= DN_MAX_OBJECT)
302 			return;
303 
304 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
305 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
306 		dbuf_prefetch(dn, blkid);
307 		rw_exit(&dn->dn_struct_rwlock);
308 		return;
309 	}
310 
311 	/*
312 	 * XXX - Note, if the dnode for the requested object is not
313 	 * already cached, we will do a *synchronous* read in the
314 	 * dnode_hold() call.  The same is true for any indirects.
315 	 */
316 	err = dnode_hold(os->os, object, FTAG, &dn);
317 	if (err != 0)
318 		return;
319 
320 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
321 	if (dn->dn_datablkshift) {
322 		int blkshift = dn->dn_datablkshift;
323 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
324 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
325 	} else {
326 		nblks = (offset < dn->dn_datablksz);
327 	}
328 
329 	if (nblks != 0) {
330 		blkid = dbuf_whichblock(dn, offset);
331 		for (i = 0; i < nblks; i++)
332 			dbuf_prefetch(dn, blkid+i);
333 	}
334 
335 	rw_exit(&dn->dn_struct_rwlock);
336 
337 	dnode_rele(dn, FTAG);
338 }
339 
340 int
341 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
342     uint64_t size, dmu_tx_t *tx)
343 {
344 	dnode_t *dn;
345 	int err = dnode_hold(os->os, object, FTAG, &dn);
346 	if (err)
347 		return (err);
348 	ASSERT(offset < UINT64_MAX);
349 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
350 	dnode_free_range(dn, offset, size, tx);
351 	dnode_rele(dn, FTAG);
352 	return (0);
353 }
354 
355 int
356 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
357     void *buf)
358 {
359 	dnode_t *dn;
360 	dmu_buf_t **dbp;
361 	int numbufs, i, err;
362 
363 	/*
364 	 * Deal with odd block sizes, where there can't be data past the
365 	 * first block.
366 	 */
367 	err = dnode_hold(os->os, object, FTAG, &dn);
368 	if (err)
369 		return (err);
370 	if (dn->dn_datablkshift == 0) {
371 		int newsz = offset > dn->dn_datablksz ? 0 :
372 		    MIN(size, dn->dn_datablksz - offset);
373 		bzero((char *)buf + newsz, size - newsz);
374 		size = newsz;
375 	}
376 
377 	while (size > 0) {
378 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
379 		int err;
380 
381 		/*
382 		 * NB: we could do this block-at-a-time, but it's nice
383 		 * to be reading in parallel.
384 		 */
385 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
386 		    TRUE, FTAG, &numbufs, &dbp);
387 		if (err)
388 			return (err);
389 
390 		for (i = 0; i < numbufs; i++) {
391 			int tocpy;
392 			int bufoff;
393 			dmu_buf_t *db = dbp[i];
394 
395 			ASSERT(size > 0);
396 
397 			bufoff = offset - db->db_offset;
398 			tocpy = (int)MIN(db->db_size - bufoff, size);
399 
400 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
401 
402 			offset += tocpy;
403 			size -= tocpy;
404 			buf = (char *)buf + tocpy;
405 		}
406 		dmu_buf_rele_array(dbp, numbufs, FTAG);
407 	}
408 	dnode_rele(dn, FTAG);
409 	return (0);
410 }
411 
412 void
413 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
414     const void *buf, dmu_tx_t *tx)
415 {
416 	dmu_buf_t **dbp;
417 	int numbufs, i;
418 
419 	if (size == 0)
420 		return;
421 
422 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
423 	    FALSE, FTAG, &numbufs, &dbp));
424 
425 	for (i = 0; i < numbufs; i++) {
426 		int tocpy;
427 		int bufoff;
428 		dmu_buf_t *db = dbp[i];
429 
430 		ASSERT(size > 0);
431 
432 		bufoff = offset - db->db_offset;
433 		tocpy = (int)MIN(db->db_size - bufoff, size);
434 
435 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
436 
437 		if (tocpy == db->db_size)
438 			dmu_buf_will_fill(db, tx);
439 		else
440 			dmu_buf_will_dirty(db, tx);
441 
442 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
443 
444 		if (tocpy == db->db_size)
445 			dmu_buf_fill_done(db, tx);
446 
447 		offset += tocpy;
448 		size -= tocpy;
449 		buf = (char *)buf + tocpy;
450 	}
451 	dmu_buf_rele_array(dbp, numbufs, FTAG);
452 }
453 
454 #ifdef _KERNEL
455 int
456 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
457     uio_t *uio, dmu_tx_t *tx)
458 {
459 	dmu_buf_t **dbp;
460 	int numbufs, i;
461 	int err = 0;
462 
463 	if (size == 0)
464 		return (0);
465 
466 	err = dmu_buf_hold_array(os, object, offset, size,
467 	    FALSE, FTAG, &numbufs, &dbp);
468 	if (err)
469 		return (err);
470 
471 	for (i = 0; i < numbufs; i++) {
472 		int tocpy;
473 		int bufoff;
474 		dmu_buf_t *db = dbp[i];
475 
476 		ASSERT(size > 0);
477 
478 		bufoff = offset - db->db_offset;
479 		tocpy = (int)MIN(db->db_size - bufoff, size);
480 
481 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
482 
483 		if (tocpy == db->db_size)
484 			dmu_buf_will_fill(db, tx);
485 		else
486 			dmu_buf_will_dirty(db, tx);
487 
488 		/*
489 		 * XXX uiomove could block forever (eg. nfs-backed
490 		 * pages).  There needs to be a uiolockdown() function
491 		 * to lock the pages in memory, so that uiomove won't
492 		 * block.
493 		 */
494 		err = uiomove((char *)db->db_data + bufoff, tocpy,
495 		    UIO_WRITE, uio);
496 
497 		if (tocpy == db->db_size)
498 			dmu_buf_fill_done(db, tx);
499 
500 		if (err)
501 			break;
502 
503 		offset += tocpy;
504 		size -= tocpy;
505 	}
506 	dmu_buf_rele_array(dbp, numbufs, FTAG);
507 	return (err);
508 }
509 
510 int
511 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
512     page_t *pp, dmu_tx_t *tx)
513 {
514 	dmu_buf_t **dbp;
515 	int numbufs, i;
516 	int err;
517 
518 	if (size == 0)
519 		return (0);
520 
521 	err = dmu_buf_hold_array(os, object, offset, size,
522 	    FALSE, FTAG, &numbufs, &dbp);
523 	if (err)
524 		return (err);
525 
526 	for (i = 0; i < numbufs; i++) {
527 		int tocpy, copied, thiscpy;
528 		int bufoff;
529 		dmu_buf_t *db = dbp[i];
530 		caddr_t va;
531 
532 		ASSERT(size > 0);
533 		ASSERT3U(db->db_size, >=, PAGESIZE);
534 
535 		bufoff = offset - db->db_offset;
536 		tocpy = (int)MIN(db->db_size - bufoff, size);
537 
538 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
539 
540 		if (tocpy == db->db_size)
541 			dmu_buf_will_fill(db, tx);
542 		else
543 			dmu_buf_will_dirty(db, tx);
544 
545 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
546 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
547 			thiscpy = MIN(PAGESIZE, tocpy - copied);
548 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
549 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
550 			ppmapout(va);
551 			pp = pp->p_next;
552 			bufoff += PAGESIZE;
553 		}
554 
555 		if (tocpy == db->db_size)
556 			dmu_buf_fill_done(db, tx);
557 
558 		if (err)
559 			break;
560 
561 		offset += tocpy;
562 		size -= tocpy;
563 	}
564 	dmu_buf_rele_array(dbp, numbufs, FTAG);
565 	return (err);
566 }
567 #endif
568 
569 typedef struct {
570 	uint64_t	txg;
571 	dmu_buf_impl_t	*db;
572 	dmu_sync_cb_t	*done;
573 	void		*arg;
574 } dmu_sync_cbin_t;
575 
576 typedef union {
577 	dmu_sync_cbin_t	data;
578 	blkptr_t	blk;
579 } dmu_sync_cbarg_t;
580 
581 /* ARGSUSED */
582 static void
583 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
584 {
585 	dmu_sync_cbin_t *in = (dmu_sync_cbin_t *)varg;
586 	dmu_buf_impl_t *db = in->db;
587 	uint64_t txg = in->txg;
588 	dmu_sync_cb_t *done = in->done;
589 	void *arg = in->arg;
590 	blkptr_t *blk = (blkptr_t *)varg;
591 
592 	if (!BP_IS_HOLE(zio->io_bp)) {
593 		zio->io_bp->blk_fill = 1;
594 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
595 		BP_SET_LEVEL(zio->io_bp, 0);
596 	}
597 
598 	*blk = *zio->io_bp; /* structure assignment */
599 
600 	mutex_enter(&db->db_mtx);
601 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == IN_DMU_SYNC);
602 	db->db_d.db_overridden_by[txg&TXG_MASK] = blk;
603 	cv_broadcast(&db->db_changed);
604 	mutex_exit(&db->db_mtx);
605 
606 	if (done)
607 		done(&(db->db), arg);
608 }
609 
610 /*
611  * Intent log support: sync the block associated with db to disk.
612  * N.B. and XXX: the caller is responsible for making sure that the
613  * data isn't changing while dmu_sync() is writing it.
614  *
615  * Return values:
616  *
617  *	EEXIST: this txg has already been synced, so there's nothing to to.
618  *		The caller should not log the write.
619  *
620  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
621  *		The caller should not log the write.
622  *
623  *	EALREADY: this block is already in the process of being synced.
624  *		The caller should track its progress (somehow).
625  *
626  *	EINPROGRESS: the IO has been initiated.
627  *		The caller should log this blkptr in the callback.
628  *
629  *	0: completed.  Sets *bp to the blkptr just written.
630  *		The caller should log this blkptr immediately.
631  */
632 int
633 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
634     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
635 {
636 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
637 	objset_impl_t *os = db->db_objset;
638 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
639 	tx_state_t *tx = &dp->dp_tx;
640 	dmu_sync_cbin_t *in;
641 	blkptr_t *blk;
642 	zbookmark_t zb;
643 	uint32_t arc_flag;
644 	int err;
645 
646 	ASSERT(BP_IS_HOLE(bp));
647 	ASSERT(txg != 0);
648 
649 
650 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
651 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
652 
653 	/*
654 	 * XXX - would be nice if we could do this without suspending...
655 	 */
656 	txg_suspend(dp);
657 
658 	/*
659 	 * If this txg already synced, there's nothing to do.
660 	 */
661 	if (txg <= tx->tx_synced_txg) {
662 		txg_resume(dp);
663 		/*
664 		 * If we're running ziltest, we need the blkptr regardless.
665 		 */
666 		if (txg > spa_freeze_txg(dp->dp_spa)) {
667 			/* if db_blkptr == NULL, this was an empty write */
668 			if (db->db_blkptr)
669 				*bp = *db->db_blkptr; /* structure assignment */
670 			return (0);
671 		}
672 		return (EEXIST);
673 	}
674 
675 	mutex_enter(&db->db_mtx);
676 
677 	blk = db->db_d.db_overridden_by[txg&TXG_MASK];
678 	if (blk == IN_DMU_SYNC) {
679 		/*
680 		 * We have already issued a sync write for this buffer.
681 		 */
682 		mutex_exit(&db->db_mtx);
683 		txg_resume(dp);
684 		return (EALREADY);
685 	} else if (blk != NULL) {
686 		/*
687 		 * This buffer had already been synced.  It could not
688 		 * have been dirtied since, or we would have cleared blk.
689 		 */
690 		*bp = *blk; /* structure assignment */
691 		mutex_exit(&db->db_mtx);
692 		txg_resume(dp);
693 		return (0);
694 	}
695 
696 	if (txg == tx->tx_syncing_txg) {
697 		while (db->db_data_pending) {
698 			/*
699 			 * IO is in-progress.  Wait for it to finish.
700 			 * XXX - would be nice to be able to somehow "attach"
701 			 * this zio to the parent zio passed in.
702 			 */
703 			cv_wait(&db->db_changed, &db->db_mtx);
704 			if (!db->db_data_pending &&
705 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
706 				/*
707 				 * IO was compressed away
708 				 */
709 				*bp = *db->db_blkptr; /* structure assignment */
710 				mutex_exit(&db->db_mtx);
711 				txg_resume(dp);
712 				return (0);
713 			}
714 			ASSERT(db->db_data_pending ||
715 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
716 		}
717 
718 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
719 			/*
720 			 * IO is already completed.
721 			 */
722 			*bp = *db->db_blkptr; /* structure assignment */
723 			mutex_exit(&db->db_mtx);
724 			txg_resume(dp);
725 			return (0);
726 		}
727 	}
728 
729 	if (db->db_d.db_data_old[txg&TXG_MASK] == NULL) {
730 		/*
731 		 * This dbuf isn't dirty, must have been free_range'd.
732 		 * There's no need to log writes to freed blocks, so we're done.
733 		 */
734 		mutex_exit(&db->db_mtx);
735 		txg_resume(dp);
736 		return (ENOENT);
737 	}
738 
739 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == NULL);
740 	db->db_d.db_overridden_by[txg&TXG_MASK] = IN_DMU_SYNC;
741 	/*
742 	 * XXX - a little ugly to stash the blkptr in the callback
743 	 * buffer.  We always need to make sure the following is true:
744 	 * ASSERT(sizeof(blkptr_t) >= sizeof(dmu_sync_cbin_t));
745 	 */
746 	in = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
747 	in->db = db;
748 	in->txg = txg;
749 	in->done = done;
750 	in->arg = arg;
751 	mutex_exit(&db->db_mtx);
752 	txg_resume(dp);
753 
754 	arc_flag = pio == NULL ? ARC_WAIT : ARC_NOWAIT;
755 	zb.zb_objset = os->os_dsl_dataset->ds_object;
756 	zb.zb_object = db->db.db_object;
757 	zb.zb_level = db->db_level;
758 	zb.zb_blkid = db->db_blkid;
759 	err = arc_write(pio, os->os_spa,
760 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
761 	    zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
762 	    dmu_get_replication_level(os->os_spa, &zb, db->db_dnode->dn_type),
763 	    txg, bp, db->db_d.db_data_old[txg&TXG_MASK], dmu_sync_done, in,
764 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, arc_flag, &zb);
765 	ASSERT(err == 0);
766 
767 	return (arc_flag == ARC_NOWAIT ? EINPROGRESS : 0);
768 }
769 
770 int
771 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
772 	dmu_tx_t *tx)
773 {
774 	dnode_t *dn;
775 	int err;
776 
777 	err = dnode_hold(os->os, object, FTAG, &dn);
778 	if (err)
779 		return (err);
780 	err = dnode_set_blksz(dn, size, ibs, tx);
781 	dnode_rele(dn, FTAG);
782 	return (err);
783 }
784 
785 void
786 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
787 	dmu_tx_t *tx)
788 {
789 	dnode_t *dn;
790 
791 	/* XXX assumes dnode_hold will not get an i/o error */
792 	(void) dnode_hold(os->os, object, FTAG, &dn);
793 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
794 	dn->dn_checksum = checksum;
795 	dnode_setdirty(dn, tx);
796 	dnode_rele(dn, FTAG);
797 }
798 
799 void
800 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
801 	dmu_tx_t *tx)
802 {
803 	dnode_t *dn;
804 
805 	/* XXX assumes dnode_hold will not get an i/o error */
806 	(void) dnode_hold(os->os, object, FTAG, &dn);
807 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
808 	dn->dn_compress = compress;
809 	dnode_setdirty(dn, tx);
810 	dnode_rele(dn, FTAG);
811 }
812 
813 /*
814  * XXX - eventually, this should take into account per-dataset (or
815  *       even per-object?) user requests for higher levels of replication.
816  */
817 int
818 dmu_get_replication_level(spa_t *spa, zbookmark_t *zb, dmu_object_type_t ot)
819 {
820 	int ncopies = 1;
821 
822 	if (dmu_ot[ot].ot_metadata)
823 		ncopies++;
824 	if (zb->zb_level != 0)
825 		ncopies++;
826 	if (zb->zb_objset == 0 && zb->zb_object == 0)
827 		ncopies++;
828 	return (MIN(ncopies, spa_max_replication(spa)));
829 }
830 
831 int
832 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
833 {
834 	dnode_t *dn;
835 	int i, err;
836 
837 	err = dnode_hold(os->os, object, FTAG, &dn);
838 	if (err)
839 		return (err);
840 	/*
841 	 * Sync any current changes before
842 	 * we go trundling through the block pointers.
843 	 */
844 	for (i = 0; i < TXG_SIZE; i++) {
845 		if (list_link_active(&dn->dn_dirty_link[i]))
846 			break;
847 	}
848 	if (i != TXG_SIZE) {
849 		dnode_rele(dn, FTAG);
850 		txg_wait_synced(dmu_objset_pool(os), 0);
851 		err = dnode_hold(os->os, object, FTAG, &dn);
852 		if (err)
853 			return (err);
854 	}
855 
856 	err = dnode_next_offset(dn, hole, off, 1, 1, 0);
857 	dnode_rele(dn, FTAG);
858 
859 	return (err);
860 }
861 
862 void
863 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
864 {
865 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
866 	mutex_enter(&dn->dn_mtx);
867 
868 	doi->doi_data_block_size = dn->dn_datablksz;
869 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
870 	    1ULL << dn->dn_indblkshift : 0;
871 	doi->doi_indirection = dn->dn_nlevels;
872 	doi->doi_checksum = dn->dn_checksum;
873 	doi->doi_compress = dn->dn_compress;
874 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
875 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
876 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
877 	doi->doi_type = dn->dn_type;
878 	doi->doi_bonus_size = dn->dn_bonuslen;
879 	doi->doi_bonus_type = dn->dn_bonustype;
880 
881 	mutex_exit(&dn->dn_mtx);
882 	rw_exit(&dn->dn_struct_rwlock);
883 }
884 
885 /*
886  * Get information on a DMU object.
887  * If doi is NULL, just indicates whether the object exists.
888  */
889 int
890 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
891 {
892 	dnode_t *dn;
893 	int err = dnode_hold(os->os, object, FTAG, &dn);
894 
895 	if (err)
896 		return (err);
897 
898 	if (doi != NULL)
899 		dmu_object_info_from_dnode(dn, doi);
900 
901 	dnode_rele(dn, FTAG);
902 	return (0);
903 }
904 
905 /*
906  * As above, but faster; can be used when you have a held dbuf in hand.
907  */
908 void
909 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
910 {
911 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
912 }
913 
914 /*
915  * Faster still when you only care about the size.
916  * This is specifically optimized for zfs_getattr().
917  */
918 void
919 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
920 {
921 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
922 
923 	*blksize = dn->dn_datablksz;
924 	/* add 1 for dnode space */
925 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
926 	    SPA_MINBLOCKSHIFT) + 1;
927 }
928 
929 void
930 byteswap_uint64_array(void *vbuf, size_t size)
931 {
932 	uint64_t *buf = vbuf;
933 	size_t count = size >> 3;
934 	int i;
935 
936 	ASSERT((size & 7) == 0);
937 
938 	for (i = 0; i < count; i++)
939 		buf[i] = BSWAP_64(buf[i]);
940 }
941 
942 void
943 byteswap_uint32_array(void *vbuf, size_t size)
944 {
945 	uint32_t *buf = vbuf;
946 	size_t count = size >> 2;
947 	int i;
948 
949 	ASSERT((size & 3) == 0);
950 
951 	for (i = 0; i < count; i++)
952 		buf[i] = BSWAP_32(buf[i]);
953 }
954 
955 void
956 byteswap_uint16_array(void *vbuf, size_t size)
957 {
958 	uint16_t *buf = vbuf;
959 	size_t count = size >> 1;
960 	int i;
961 
962 	ASSERT((size & 1) == 0);
963 
964 	for (i = 0; i < count; i++)
965 		buf[i] = BSWAP_16(buf[i]);
966 }
967 
968 /* ARGSUSED */
969 void
970 byteswap_uint8_array(void *vbuf, size_t size)
971 {
972 }
973 
974 void
975 dmu_init(void)
976 {
977 	dbuf_init();
978 	dnode_init();
979 	arc_init();
980 }
981 
982 void
983 dmu_fini(void)
984 {
985 	arc_fini();
986 	dnode_fini();
987 	dbuf_fini();
988 }
989