1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 */
24/*
25 * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
26 * Copyright 2019 Joyent, Inc.
27 * Copyright 2016 Nexenta Systems, Inc. All rights reserved.
28 * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
29 * Copyright (c) 2018 DilOS
30 */
31
32#include <sys/dmu.h>
33#include <sys/dmu_impl.h>
34#include <sys/dmu_tx.h>
35#include <sys/dbuf.h>
36#include <sys/dnode.h>
37#include <sys/zfs_context.h>
38#include <sys/dmu_objset.h>
39#include <sys/dmu_traverse.h>
40#include <sys/dsl_dataset.h>
41#include <sys/dsl_dir.h>
42#include <sys/dsl_pool.h>
43#include <sys/dsl_synctask.h>
44#include <sys/dsl_prop.h>
45#include <sys/dmu_zfetch.h>
46#include <sys/zfs_ioctl.h>
47#include <sys/zap.h>
48#include <sys/zio_checksum.h>
49#include <sys/zio_compress.h>
50#include <sys/sa.h>
51#include <sys/zfeature.h>
52#include <sys/abd.h>
53#ifdef _KERNEL
54#include <sys/vmsystm.h>
55#include <sys/zfs_znode.h>
56#endif
57
58static xuio_stats_t xuio_stats = {
59	{ "onloan_read_buf",	KSTAT_DATA_UINT64 },
60	{ "onloan_write_buf",	KSTAT_DATA_UINT64 },
61	{ "read_buf_copied",	KSTAT_DATA_UINT64 },
62	{ "read_buf_nocopy",	KSTAT_DATA_UINT64 },
63	{ "write_buf_copied",	KSTAT_DATA_UINT64 },
64	{ "write_buf_nocopy",	KSTAT_DATA_UINT64 }
65};
66
67#define	XUIOSTAT_INCR(stat, val)	\
68	atomic_add_64(&xuio_stats.stat.value.ui64, (val))
69#define	XUIOSTAT_BUMP(stat)	XUIOSTAT_INCR(stat, 1)
70
71/*
72 * Enable/disable nopwrite feature.
73 */
74int zfs_nopwrite_enabled = 1;
75
76/*
77 * Tunable to control percentage of dirtied blocks from frees in one TXG.
78 * After this threshold is crossed, additional dirty blocks from frees
79 * wait until the next TXG.
80 * A value of zero will disable this throttle.
81 */
82uint32_t zfs_per_txg_dirty_frees_percent = 30;
83
84/*
85 * This can be used for testing, to ensure that certain actions happen
86 * while in the middle of a remap (which might otherwise complete too
87 * quickly).
88 */
89int zfs_object_remap_one_indirect_delay_ticks = 0;
90
91/*
92 * Limit the amount we can prefetch with one call to this amount.  This
93 * helps to limit the amount of memory that can be used by prefetching.
94 * Larger objects should be prefetched a bit at a time.
95 */
96uint64_t dmu_prefetch_max = 8 * SPA_MAXBLOCKSIZE;
97
98const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
99	{ DMU_BSWAP_UINT8,  TRUE,  FALSE, FALSE, "unallocated"		},
100	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "object directory"	},
101	{ DMU_BSWAP_UINT64, TRUE,  TRUE,  FALSE, "object array"		},
102	{ DMU_BSWAP_UINT8,  TRUE,  FALSE, FALSE, "packed nvlist"	},
103	{ DMU_BSWAP_UINT64, TRUE,  FALSE, FALSE, "packed nvlist size"	},
104	{ DMU_BSWAP_UINT64, TRUE,  FALSE, FALSE, "bpobj"	},
105	{ DMU_BSWAP_UINT64, TRUE,  FALSE, FALSE, "bpobj header"		},
106	{ DMU_BSWAP_UINT64, TRUE,  FALSE, FALSE, "SPA space map header"	},
107	{ DMU_BSWAP_UINT64, TRUE,  FALSE, FALSE, "SPA space map"	},
108	{ DMU_BSWAP_UINT64, TRUE,  FALSE, TRUE,  "ZIL intent log"	},
109	{ DMU_BSWAP_DNODE,  TRUE,  FALSE, TRUE,  "DMU dnode"	},
110	{ DMU_BSWAP_OBJSET, TRUE,  TRUE,  FALSE, "DMU objset"	},
111	{ DMU_BSWAP_UINT64, TRUE,  TRUE,  FALSE, "DSL directory"	},
112	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "DSL directory child map" },
113	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "DSL dataset snap map"	},
114	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "DSL props"	},
115	{ DMU_BSWAP_UINT64, TRUE,  TRUE,  FALSE, "DSL dataset"	},
116	{ DMU_BSWAP_ZNODE,  TRUE,  FALSE, FALSE, "ZFS znode"	},
117	{ DMU_BSWAP_OLDACL, TRUE,  FALSE, TRUE,  "ZFS V0 ACL"	},
118	{ DMU_BSWAP_UINT8,  FALSE, FALSE, TRUE,  "ZFS plain file"	},
119	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, TRUE,  "ZFS directory"	},
120	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, FALSE, "ZFS master node"	},
121	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, TRUE,  "ZFS delete queue"	},
122	{ DMU_BSWAP_UINT8,  FALSE, FALSE, TRUE,  "zvol object"	},
123	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, FALSE, "zvol prop"	},
124	{ DMU_BSWAP_UINT8,  FALSE, FALSE, TRUE,  "other uint8[]"	},
125	{ DMU_BSWAP_UINT64, FALSE, FALSE, TRUE,  "other uint64[]"	},
126	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, FALSE, "other ZAP"	},
127	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, FALSE, "persistent error log"	},
128	{ DMU_BSWAP_UINT8,  TRUE,  FALSE, FALSE, "SPA history"	},
129	{ DMU_BSWAP_UINT64, TRUE,  FALSE, FALSE, "SPA history offsets"	},
130	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "Pool properties"	},
131	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "DSL permissions"	},
132	{ DMU_BSWAP_ACL,    TRUE,  FALSE, TRUE,  "ZFS ACL"	},
133	{ DMU_BSWAP_UINT8,  TRUE,  FALSE, TRUE,  "ZFS SYSACL"	},
134	{ DMU_BSWAP_UINT8,  TRUE,  FALSE, TRUE,  "FUID table"	},
135	{ DMU_BSWAP_UINT64, TRUE,  FALSE, FALSE, "FUID table size"	},
136	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "DSL dataset next clones" },
137	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, FALSE, "scan work queue"	},
138	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, TRUE,  "ZFS user/group/project used"},
139	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, TRUE,  "ZFS user/group/proj quota"},
140	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "snapshot refcount tags" },
141	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, FALSE, "DDT ZAP algorithm"	},
142	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, FALSE, "DDT statistics"	},
143	{ DMU_BSWAP_UINT8,  TRUE,  FALSE, TRUE,  "System attributes"	},
144	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, TRUE,  "SA master node"	},
145	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, TRUE,  "SA attr registration"	},
146	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, TRUE,  "SA attr layouts"	},
147	{ DMU_BSWAP_ZAP,    TRUE,  FALSE, FALSE, "scan translations"	},
148	{ DMU_BSWAP_UINT8,  FALSE, FALSE, TRUE,  "deduplicated block"	},
149	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "DSL deadlist map" },
150	{ DMU_BSWAP_UINT64, TRUE,  TRUE,  FALSE, "DSL deadlist map hdr"	},
151	{ DMU_BSWAP_ZAP,    TRUE,  TRUE,  FALSE, "DSL dir clones"	},
152	{ DMU_BSWAP_UINT64, TRUE,  FALSE, FALSE, "bpobj subobj"		}
153};
154
155const dmu_object_byteswap_info_t dmu_ot_byteswap[DMU_BSWAP_NUMFUNCS] = {
156	{	byteswap_uint8_array,	"uint8"		},
157	{	byteswap_uint16_array,	"uint16"	},
158	{	byteswap_uint32_array,	"uint32"	},
159	{	byteswap_uint64_array,	"uint64"	},
160	{	zap_byteswap,		"zap"		},
161	{	dnode_buf_byteswap,	"dnode"		},
162	{	dmu_objset_byteswap,	"objset"	},
163	{	zfs_znode_byteswap,	"znode"		},
164	{	zfs_oldacl_byteswap,	"oldacl"	},
165	{	zfs_acl_byteswap,	"acl"		}
166};
167
168int
169dmu_buf_hold_noread_by_dnode(dnode_t *dn, uint64_t offset,
170    void *tag, dmu_buf_t **dbp)
171{
172	uint64_t blkid;
173	dmu_buf_impl_t *db;
174
175	blkid = dbuf_whichblock(dn, 0, offset);
176	rw_enter(&dn->dn_struct_rwlock, RW_READER);
177	db = dbuf_hold(dn, blkid, tag);
178	rw_exit(&dn->dn_struct_rwlock);
179
180	if (db == NULL) {
181		*dbp = NULL;
182		return (SET_ERROR(EIO));
183	}
184
185	*dbp = &db->db;
186	return (0);
187}
188int
189dmu_buf_hold_noread(objset_t *os, uint64_t object, uint64_t offset,
190    void *tag, dmu_buf_t **dbp)
191{
192	dnode_t *dn;
193	uint64_t blkid;
194	dmu_buf_impl_t *db;
195	int err;
196
197	err = dnode_hold(os, object, FTAG, &dn);
198	if (err)
199		return (err);
200	blkid = dbuf_whichblock(dn, 0, offset);
201	rw_enter(&dn->dn_struct_rwlock, RW_READER);
202	db = dbuf_hold(dn, blkid, tag);
203	rw_exit(&dn->dn_struct_rwlock);
204	dnode_rele(dn, FTAG);
205
206	if (db == NULL) {
207		*dbp = NULL;
208		return (SET_ERROR(EIO));
209	}
210
211	*dbp = &db->db;
212	return (err);
213}
214
215int
216dmu_buf_hold_by_dnode(dnode_t *dn, uint64_t offset,
217    void *tag, dmu_buf_t **dbp, int flags)
218{
219	int err;
220	int db_flags = DB_RF_CANFAIL;
221
222	if (flags & DMU_READ_NO_PREFETCH)
223		db_flags |= DB_RF_NOPREFETCH;
224	if (flags & DMU_READ_NO_DECRYPT)
225		db_flags |= DB_RF_NO_DECRYPT;
226
227	err = dmu_buf_hold_noread_by_dnode(dn, offset, tag, dbp);
228	if (err == 0) {
229		dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
230		err = dbuf_read(db, NULL, db_flags);
231		if (err != 0) {
232			dbuf_rele(db, tag);
233			*dbp = NULL;
234		}
235	}
236
237	return (err);
238}
239
240int
241dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
242    void *tag, dmu_buf_t **dbp, int flags)
243{
244	int err;
245	int db_flags = DB_RF_CANFAIL;
246
247	if (flags & DMU_READ_NO_PREFETCH)
248		db_flags |= DB_RF_NOPREFETCH;
249	if (flags & DMU_READ_NO_DECRYPT)
250		db_flags |= DB_RF_NO_DECRYPT;
251
252	err = dmu_buf_hold_noread(os, object, offset, tag, dbp);
253	if (err == 0) {
254		dmu_buf_impl_t *db = (dmu_buf_impl_t *)(*dbp);
255		err = dbuf_read(db, NULL, db_flags);
256		if (err != 0) {
257			dbuf_rele(db, tag);
258			*dbp = NULL;
259		}
260	}
261
262	return (err);
263}
264
265int
266dmu_bonus_max(void)
267{
268	return (DN_OLD_MAX_BONUSLEN);
269}
270
271int
272dmu_set_bonus(dmu_buf_t *db_fake, int newsize, dmu_tx_t *tx)
273{
274	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
275	dnode_t *dn;
276	int error;
277
278	DB_DNODE_ENTER(db);
279	dn = DB_DNODE(db);
280
281	if (dn->dn_bonus != db) {
282		error = SET_ERROR(EINVAL);
283	} else if (newsize < 0 || newsize > db_fake->db_size) {
284		error = SET_ERROR(EINVAL);
285	} else {
286		dnode_setbonuslen(dn, newsize, tx);
287		error = 0;
288	}
289
290	DB_DNODE_EXIT(db);
291	return (error);
292}
293
294int
295dmu_set_bonustype(dmu_buf_t *db_fake, dmu_object_type_t type, dmu_tx_t *tx)
296{
297	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
298	dnode_t *dn;
299	int error;
300
301	DB_DNODE_ENTER(db);
302	dn = DB_DNODE(db);
303
304	if (!DMU_OT_IS_VALID(type)) {
305		error = SET_ERROR(EINVAL);
306	} else if (dn->dn_bonus != db) {
307		error = SET_ERROR(EINVAL);
308	} else {
309		dnode_setbonus_type(dn, type, tx);
310		error = 0;
311	}
312
313	DB_DNODE_EXIT(db);
314	return (error);
315}
316
317dmu_object_type_t
318dmu_get_bonustype(dmu_buf_t *db_fake)
319{
320	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
321	dnode_t *dn;
322	dmu_object_type_t type;
323
324	DB_DNODE_ENTER(db);
325	dn = DB_DNODE(db);
326	type = dn->dn_bonustype;
327	DB_DNODE_EXIT(db);
328
329	return (type);
330}
331
332int
333dmu_rm_spill(objset_t *os, uint64_t object, dmu_tx_t *tx)
334{
335	dnode_t *dn;
336	int error;
337
338	error = dnode_hold(os, object, FTAG, &dn);
339	dbuf_rm_spill(dn, tx);
340	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
341	dnode_rm_spill(dn, tx);
342	rw_exit(&dn->dn_struct_rwlock);
343	dnode_rele(dn, FTAG);
344	return (error);
345}
346
347/*
348 * Lookup and hold the bonus buffer for the provided dnode.  If the dnode
349 * has not yet been allocated a new bonus dbuf a will be allocated.
350 * Returns ENOENT, EIO, or 0.
351 */
352int dmu_bonus_hold_by_dnode(dnode_t *dn, void *tag, dmu_buf_t **dbp,
353    uint32_t flags)
354{
355	dmu_buf_impl_t *db;
356	int error;
357	uint32_t db_flags = DB_RF_MUST_SUCCEED;
358
359	if (flags & DMU_READ_NO_PREFETCH)
360		db_flags |= DB_RF_NOPREFETCH;
361	if (flags & DMU_READ_NO_DECRYPT)
362		db_flags |= DB_RF_NO_DECRYPT;
363
364	rw_enter(&dn->dn_struct_rwlock, RW_READER);
365	if (dn->dn_bonus == NULL) {
366		rw_exit(&dn->dn_struct_rwlock);
367		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
368		if (dn->dn_bonus == NULL)
369			dbuf_create_bonus(dn);
370	}
371	db = dn->dn_bonus;
372
373	/* as long as the bonus buf is held, the dnode will be held */
374	if (zfs_refcount_add(&db->db_holds, tag) == 1) {
375		VERIFY(dnode_add_ref(dn, db));
376		atomic_inc_32(&dn->dn_dbufs_count);
377	}
378
379	/*
380	 * Wait to drop dn_struct_rwlock until after adding the bonus dbuf's
381	 * hold and incrementing the dbuf count to ensure that dnode_move() sees
382	 * a dnode hold for every dbuf.
383	 */
384	rw_exit(&dn->dn_struct_rwlock);
385
386	error = dbuf_read(db, NULL, db_flags);
387	if (error) {
388		dnode_evict_bonus(dn);
389		dbuf_rele(db, tag);
390		*dbp = NULL;
391		return (error);
392	}
393
394	*dbp = &db->db;
395	return (0);
396}
397
398/*
399 * returns ENOENT, EIO, or 0.
400 */
401int
402dmu_bonus_hold_impl(objset_t *os, uint64_t object, void *tag, uint32_t flags,
403    dmu_buf_t **dbp)
404{
405	dnode_t *dn;
406	dmu_buf_impl_t *db;
407	int error;
408	uint32_t db_flags = DB_RF_MUST_SUCCEED;
409
410	if (flags & DMU_READ_NO_PREFETCH)
411		db_flags |= DB_RF_NOPREFETCH;
412	if (flags & DMU_READ_NO_DECRYPT)
413		db_flags |= DB_RF_NO_DECRYPT;
414
415	error = dnode_hold(os, object, FTAG, &dn);
416	if (error)
417		return (error);
418
419	rw_enter(&dn->dn_struct_rwlock, RW_READER);
420	if (dn->dn_bonus == NULL) {
421		rw_exit(&dn->dn_struct_rwlock);
422		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
423		if (dn->dn_bonus == NULL)
424			dbuf_create_bonus(dn);
425	}
426	db = dn->dn_bonus;
427
428	/* as long as the bonus buf is held, the dnode will be held */
429	if (zfs_refcount_add(&db->db_holds, tag) == 1) {
430		VERIFY(dnode_add_ref(dn, db));
431		atomic_inc_32(&dn->dn_dbufs_count);
432	}
433
434	/*
435	 * Wait to drop dn_struct_rwlock until after adding the bonus dbuf's
436	 * hold and incrementing the dbuf count to ensure that dnode_move() sees
437	 * a dnode hold for every dbuf.
438	 */
439	rw_exit(&dn->dn_struct_rwlock);
440
441	dnode_rele(dn, FTAG);
442
443	error = dbuf_read(db, NULL, db_flags);
444	if (error) {
445		dnode_evict_bonus(dn);
446		dbuf_rele(db, tag);
447		*dbp = NULL;
448		return (error);
449	}
450
451	*dbp = &db->db;
452	return (0);
453}
454
455int
456dmu_bonus_hold(objset_t *os, uint64_t obj, void *tag, dmu_buf_t **dbp)
457{
458	return (dmu_bonus_hold_impl(os, obj, tag, DMU_READ_NO_PREFETCH, dbp));
459}
460
461/*
462 * returns ENOENT, EIO, or 0.
463 *
464 * This interface will allocate a blank spill dbuf when a spill blk
465 * doesn't already exist on the dnode.
466 *
467 * if you only want to find an already existing spill db, then
468 * dmu_spill_hold_existing() should be used.
469 */
470int
471dmu_spill_hold_by_dnode(dnode_t *dn, uint32_t flags, void *tag, dmu_buf_t **dbp)
472{
473	dmu_buf_impl_t *db = NULL;
474	int err;
475
476	if ((flags & DB_RF_HAVESTRUCT) == 0)
477		rw_enter(&dn->dn_struct_rwlock, RW_READER);
478
479	db = dbuf_hold(dn, DMU_SPILL_BLKID, tag);
480
481	if ((flags & DB_RF_HAVESTRUCT) == 0)
482		rw_exit(&dn->dn_struct_rwlock);
483
484	ASSERT(db != NULL);
485	err = dbuf_read(db, NULL, flags);
486	if (err == 0)
487		*dbp = &db->db;
488	else
489		dbuf_rele(db, tag);
490	return (err);
491}
492
493int
494dmu_spill_hold_existing(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
495{
496	dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
497	dnode_t *dn;
498	int err;
499
500	DB_DNODE_ENTER(db);
501	dn = DB_DNODE(db);
502
503	if (spa_version(dn->dn_objset->os_spa) < SPA_VERSION_SA) {
504		err = SET_ERROR(EINVAL);
505	} else {
506		rw_enter(&dn->dn_struct_rwlock, RW_READER);
507
508		if (!dn->dn_have_spill) {
509			err = SET_ERROR(ENOENT);
510		} else {
511			err = dmu_spill_hold_by_dnode(dn,
512			    DB_RF_HAVESTRUCT | DB_RF_CANFAIL, tag, dbp);
513		}
514
515		rw_exit(&dn->dn_struct_rwlock);
516	}
517
518	DB_DNODE_EXIT(db);
519	return (err);
520}
521
522int
523dmu_spill_hold_by_bonus(dmu_buf_t *bonus, uint32_t flags, void *tag,
524    dmu_buf_t **dbp)
525{
526	dmu_buf_impl_t *db = (dmu_buf_impl_t *)bonus;
527	dnode_t *dn;
528	int err;
529	uint32_t db_flags = DB_RF_CANFAIL;
530
531	if (flags & DMU_READ_NO_DECRYPT)
532		db_flags |= DB_RF_NO_DECRYPT;
533
534	DB_DNODE_ENTER(db);
535	dn = DB_DNODE(db);
536	err = dmu_spill_hold_by_dnode(dn, db_flags, tag, dbp);
537	DB_DNODE_EXIT(db);
538
539	return (err);
540}
541
542/*
543 * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
544 * to take a held dnode rather than <os, object> -- the lookup is wasteful,
545 * and can induce severe lock contention when writing to several files
546 * whose dnodes are in the same block.
547 */
548int
549dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
550    boolean_t read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
551{
552	dmu_buf_t **dbp;
553	uint64_t blkid, nblks, i;
554	uint32_t dbuf_flags;
555	int err;
556	zio_t *zio;
557
558	ASSERT(length <= DMU_MAX_ACCESS);
559
560	/*
561	 * Note: We directly notify the prefetch code of this read, so that
562	 * we can tell it about the multi-block read.  dbuf_read() only knows
563	 * about the one block it is accessing.
564	 */
565	dbuf_flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT | DB_RF_HAVESTRUCT |
566	    DB_RF_NOPREFETCH;
567
568	rw_enter(&dn->dn_struct_rwlock, RW_READER);
569	if (dn->dn_datablkshift) {
570		int blkshift = dn->dn_datablkshift;
571		nblks = (P2ROUNDUP(offset + length, 1ULL << blkshift) -
572		    P2ALIGN(offset, 1ULL << blkshift)) >> blkshift;
573	} else {
574		if (offset + length > dn->dn_datablksz) {
575			zfs_panic_recover("zfs: accessing past end of object "
576			    "%llx/%llx (size=%u access=%llu+%llu)",
577			    (longlong_t)dn->dn_objset->
578			    os_dsl_dataset->ds_object,
579			    (longlong_t)dn->dn_object, dn->dn_datablksz,
580			    (longlong_t)offset, (longlong_t)length);
581			rw_exit(&dn->dn_struct_rwlock);
582			return (SET_ERROR(EIO));
583		}
584		nblks = 1;
585	}
586	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
587
588	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
589	blkid = dbuf_whichblock(dn, 0, offset);
590	for (i = 0; i < nblks; i++) {
591		dmu_buf_impl_t *db = dbuf_hold(dn, blkid + i, tag);
592		if (db == NULL) {
593			rw_exit(&dn->dn_struct_rwlock);
594			dmu_buf_rele_array(dbp, nblks, tag);
595			zio_nowait(zio);
596			return (SET_ERROR(EIO));
597		}
598
599		/* initiate async i/o */
600		if (read)
601			(void) dbuf_read(db, zio, dbuf_flags);
602		dbp[i] = &db->db;
603	}
604
605	if ((flags & DMU_READ_NO_PREFETCH) == 0 &&
606	    DNODE_META_IS_CACHEABLE(dn) && length <= zfetch_array_rd_sz) {
607		dmu_zfetch(&dn->dn_zfetch, blkid, nblks,
608		    read && DNODE_IS_CACHEABLE(dn));
609	}
610	rw_exit(&dn->dn_struct_rwlock);
611
612	/* wait for async i/o */
613	err = zio_wait(zio);
614	if (err) {
615		dmu_buf_rele_array(dbp, nblks, tag);
616		return (err);
617	}
618
619	/* wait for other io to complete */
620	if (read) {
621		for (i = 0; i < nblks; i++) {
622			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
623			mutex_enter(&db->db_mtx);
624			while (db->db_state == DB_READ ||
625			    db->db_state == DB_FILL)
626				cv_wait(&db->db_changed, &db->db_mtx);
627			if (db->db_state == DB_UNCACHED)
628				err = SET_ERROR(EIO);
629			mutex_exit(&db->db_mtx);
630			if (err) {
631				dmu_buf_rele_array(dbp, nblks, tag);
632				return (err);
633			}
634		}
635	}
636
637	*numbufsp = nblks;
638	*dbpp = dbp;
639	return (0);
640}
641
642static int
643dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
644    uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
645{
646	dnode_t *dn;
647	int err;
648
649	err = dnode_hold(os, object, FTAG, &dn);
650	if (err)
651		return (err);
652
653	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
654	    numbufsp, dbpp, DMU_READ_PREFETCH);
655
656	dnode_rele(dn, FTAG);
657
658	return (err);
659}
660
661int
662dmu_buf_hold_array_by_bonus(dmu_buf_t *db_fake, uint64_t offset,
663    uint64_t length, boolean_t read, void *tag, int *numbufsp,
664    dmu_buf_t ***dbpp)
665{
666	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
667	dnode_t *dn;
668	int err;
669
670	DB_DNODE_ENTER(db);
671	dn = DB_DNODE(db);
672	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
673	    numbufsp, dbpp, DMU_READ_PREFETCH);
674	DB_DNODE_EXIT(db);
675
676	return (err);
677}
678
679void
680dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
681{
682	int i;
683	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
684
685	if (numbufs == 0)
686		return;
687
688	for (i = 0; i < numbufs; i++) {
689		if (dbp[i])
690			dbuf_rele(dbp[i], tag);
691	}
692
693	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
694}
695
696/*
697 * Issue prefetch i/os for the given blocks.  If level is greater than 0, the
698 * indirect blocks prefeteched will be those that point to the blocks containing
699 * the data starting at offset, and continuing to offset + len.
700 *
701 * Note that if the indirect blocks above the blocks being prefetched are not
702 * in cache, they will be asychronously read in.
703 */
704void
705dmu_prefetch(objset_t *os, uint64_t object, int64_t level, uint64_t offset,
706    uint64_t len, zio_priority_t pri)
707{
708	dnode_t *dn;
709	uint64_t blkid;
710	int nblks, err;
711
712	if (len == 0) {  /* they're interested in the bonus buffer */
713		dn = DMU_META_DNODE(os);
714
715		if (object == 0 || object >= DN_MAX_OBJECT)
716			return;
717
718		rw_enter(&dn->dn_struct_rwlock, RW_READER);
719		blkid = dbuf_whichblock(dn, level,
720		    object * sizeof (dnode_phys_t));
721		dbuf_prefetch(dn, level, blkid, pri, 0);
722		rw_exit(&dn->dn_struct_rwlock);
723		return;
724	}
725
726	/*
727	 * See comment before the definition of dmu_prefetch_max.
728	 */
729	len = MIN(len, dmu_prefetch_max);
730
731	/*
732	 * XXX - Note, if the dnode for the requested object is not
733	 * already cached, we will do a *synchronous* read in the
734	 * dnode_hold() call.  The same is true for any indirects.
735	 */
736	err = dnode_hold(os, object, FTAG, &dn);
737	if (err != 0)
738		return;
739
740	rw_enter(&dn->dn_struct_rwlock, RW_READER);
741	/*
742	 * offset + len - 1 is the last byte we want to prefetch for, and offset
743	 * is the first.  Then dbuf_whichblk(dn, level, off + len - 1) is the
744	 * last block we want to prefetch, and dbuf_whichblock(dn, level,
745	 * offset)  is the first.  Then the number we need to prefetch is the
746	 * last - first + 1.
747	 */
748	if (level > 0 || dn->dn_datablkshift != 0) {
749		nblks = dbuf_whichblock(dn, level, offset + len - 1) -
750		    dbuf_whichblock(dn, level, offset) + 1;
751	} else {
752		nblks = (offset < dn->dn_datablksz);
753	}
754
755	if (nblks != 0) {
756		blkid = dbuf_whichblock(dn, level, offset);
757		for (int i = 0; i < nblks; i++)
758			dbuf_prefetch(dn, level, blkid + i, pri, 0);
759	}
760
761	rw_exit(&dn->dn_struct_rwlock);
762
763	dnode_rele(dn, FTAG);
764}
765
766/*
767 * Get the next "chunk" of file data to free.  We traverse the file from
768 * the end so that the file gets shorter over time (if we crashes in the
769 * middle, this will leave us in a better state).  We find allocated file
770 * data by simply searching the allocated level 1 indirects.
771 *
772 * On input, *start should be the first offset that does not need to be
773 * freed (e.g. "offset + length").  On return, *start will be the first
774 * offset that should be freed.
775 */
776static int
777get_next_chunk(dnode_t *dn, uint64_t *start, uint64_t minimum)
778{
779	uint64_t maxblks = DMU_MAX_ACCESS >> (dn->dn_indblkshift + 1);
780	/* bytes of data covered by a level-1 indirect block */
781	uint64_t iblkrange =
782	    dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
783
784	ASSERT3U(minimum, <=, *start);
785
786	if (*start - minimum <= iblkrange * maxblks) {
787		*start = minimum;
788		return (0);
789	}
790	ASSERT(ISP2(iblkrange));
791
792	for (uint64_t blks = 0; *start > minimum && blks < maxblks; blks++) {
793		int err;
794
795		/*
796		 * dnode_next_offset(BACKWARDS) will find an allocated L1
797		 * indirect block at or before the input offset.  We must
798		 * decrement *start so that it is at the end of the region
799		 * to search.
800		 */
801		(*start)--;
802		err = dnode_next_offset(dn,
803		    DNODE_FIND_BACKWARDS, start, 2, 1, 0);
804
805		/* if there are no indirect blocks before start, we are done */
806		if (err == ESRCH) {
807			*start = minimum;
808			break;
809		} else if (err != 0) {
810			return (err);
811		}
812
813		/* set start to the beginning of this L1 indirect */
814		*start = P2ALIGN(*start, iblkrange);
815	}
816	if (*start < minimum)
817		*start = minimum;
818	return (0);
819}
820
821/*
822 * If this objset is of type OST_ZFS return true if vfs's unmounted flag is set,
823 * otherwise return false.
824 * Used below in dmu_free_long_range_impl() to enable abort when unmounting
825 */
826/*ARGSUSED*/
827static boolean_t
828dmu_objset_zfs_unmounting(objset_t *os)
829{
830#ifdef _KERNEL
831	if (dmu_objset_type(os) == DMU_OST_ZFS)
832		return (zfs_get_vfs_flag_unmounted(os));
833#endif
834	return (B_FALSE);
835}
836
837static int
838dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
839    uint64_t length)
840{
841	uint64_t object_size = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
842	int err;
843	uint64_t dirty_frees_threshold;
844	dsl_pool_t *dp = dmu_objset_pool(os);
845
846	if (offset >= object_size)
847		return (0);
848
849	if (zfs_per_txg_dirty_frees_percent <= 100)
850		dirty_frees_threshold =
851		    zfs_per_txg_dirty_frees_percent * zfs_dirty_data_max / 100;
852	else
853		dirty_frees_threshold = zfs_dirty_data_max / 4;
854
855	if (length == DMU_OBJECT_END || offset + length > object_size)
856		length = object_size - offset;
857
858	while (length != 0) {
859		uint64_t chunk_end, chunk_begin, chunk_len;
860		uint64_t long_free_dirty_all_txgs = 0;
861		dmu_tx_t *tx;
862
863		if (dmu_objset_zfs_unmounting(dn->dn_objset))
864			return (SET_ERROR(EINTR));
865
866		chunk_end = chunk_begin = offset + length;
867
868		/* move chunk_begin backwards to the beginning of this chunk */
869		err = get_next_chunk(dn, &chunk_begin, offset);
870		if (err)
871			return (err);
872		ASSERT3U(chunk_begin, >=, offset);
873		ASSERT3U(chunk_begin, <=, chunk_end);
874
875		chunk_len = chunk_end - chunk_begin;
876
877		mutex_enter(&dp->dp_lock);
878		for (int t = 0; t < TXG_SIZE; t++) {
879			long_free_dirty_all_txgs +=
880			    dp->dp_long_free_dirty_pertxg[t];
881		}
882		mutex_exit(&dp->dp_lock);
883
884		/*
885		 * To avoid filling up a TXG with just frees wait for
886		 * the next TXG to open before freeing more chunks if
887		 * we have reached the threshold of frees
888		 */
889		if (dirty_frees_threshold != 0 &&
890		    long_free_dirty_all_txgs >= dirty_frees_threshold) {
891			txg_wait_open(dp, 0, B_TRUE);
892			continue;
893		}
894
895		tx = dmu_tx_create(os);
896		dmu_tx_hold_free(tx, dn->dn_object, chunk_begin, chunk_len);
897
898		/*
899		 * Mark this transaction as typically resulting in a net
900		 * reduction in space used.
901		 */
902		dmu_tx_mark_netfree(tx);
903		err = dmu_tx_assign(tx, TXG_WAIT);
904		if (err) {
905			dmu_tx_abort(tx);
906			return (err);
907		}
908
909		mutex_enter(&dp->dp_lock);
910		dp->dp_long_free_dirty_pertxg[dmu_tx_get_txg(tx) & TXG_MASK] +=
911		    chunk_len;
912		mutex_exit(&dp->dp_lock);
913		DTRACE_PROBE3(free__long__range,
914		    uint64_t, long_free_dirty_all_txgs, uint64_t, chunk_len,
915		    uint64_t, dmu_tx_get_txg(tx));
916		dnode_free_range(dn, chunk_begin, chunk_len, tx);
917
918		dmu_tx_commit(tx);
919
920		length -= chunk_len;
921	}
922	return (0);
923}
924
925int
926dmu_free_long_range(objset_t *os, uint64_t object,
927    uint64_t offset, uint64_t length)
928{
929	dnode_t *dn;
930	int err;
931
932	err = dnode_hold(os, object, FTAG, &dn);
933	if (err != 0)
934		return (err);
935	err = dmu_free_long_range_impl(os, dn, offset, length);
936
937	/*
938	 * It is important to zero out the maxblkid when freeing the entire
939	 * file, so that (a) subsequent calls to dmu_free_long_range_impl()
940	 * will take the fast path, and (b) dnode_reallocate() can verify
941	 * that the entire file has been freed.
942	 */
943	if (err == 0 && offset == 0 && length == DMU_OBJECT_END)
944		dn->dn_maxblkid = 0;
945
946	dnode_rele(dn, FTAG);
947	return (err);
948}
949
950int
951dmu_free_long_object(objset_t *os, uint64_t object)
952{
953	dmu_tx_t *tx;
954	int err;
955
956	err = dmu_free_long_range(os, object, 0, DMU_OBJECT_END);
957	if (err != 0)
958		return (err);
959
960	tx = dmu_tx_create(os);
961	dmu_tx_hold_bonus(tx, object);
962	dmu_tx_hold_free(tx, object, 0, DMU_OBJECT_END);
963	dmu_tx_mark_netfree(tx);
964	err = dmu_tx_assign(tx, TXG_WAIT);
965	if (err == 0) {
966		if (err == 0)
967			err = dmu_object_free(os, object, tx);
968
969		dmu_tx_commit(tx);
970	} else {
971		dmu_tx_abort(tx);
972	}
973
974	return (err);
975}
976
977int
978dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
979    uint64_t size, dmu_tx_t *tx)
980{
981	dnode_t *dn;
982	int err = dnode_hold(os, object, FTAG, &dn);
983	if (err)
984		return (err);
985	ASSERT(offset < UINT64_MAX);
986	ASSERT(size == DMU_OBJECT_END || size <= UINT64_MAX - offset);
987	dnode_free_range(dn, offset, size, tx);
988	dnode_rele(dn, FTAG);
989	return (0);
990}
991
992static int
993dmu_read_impl(dnode_t *dn, uint64_t offset, uint64_t size,
994    void *buf, uint32_t flags)
995{
996	dmu_buf_t **dbp;
997	int numbufs, err = 0;
998
999	/*
1000	 * Deal with odd block sizes, where there can't be data past the first
1001	 * block.  If we ever do the tail block optimization, we will need to
1002	 * handle that here as well.
1003	 */
1004	if (dn->dn_maxblkid == 0) {
1005		int newsz = offset > dn->dn_datablksz ? 0 :
1006		    MIN(size, dn->dn_datablksz - offset);
1007		bzero((char *)buf + newsz, size - newsz);
1008		size = newsz;
1009	}
1010
1011	while (size > 0) {
1012		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
1013		int i;
1014
1015		/*
1016		 * NB: we could do this block-at-a-time, but it's nice
1017		 * to be reading in parallel.
1018		 */
1019		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
1020		    TRUE, FTAG, &numbufs, &dbp, flags);
1021		if (err)
1022			break;
1023
1024		for (i = 0; i < numbufs; i++) {
1025			int tocpy;
1026			int bufoff;
1027			dmu_buf_t *db = dbp[i];
1028
1029			ASSERT(size > 0);
1030
1031			bufoff = offset - db->db_offset;
1032			tocpy = (int)MIN(db->db_size - bufoff, size);
1033
1034			bcopy((char *)db->db_data + bufoff, buf, tocpy);
1035
1036			offset += tocpy;
1037			size -= tocpy;
1038			buf = (char *)buf + tocpy;
1039		}
1040		dmu_buf_rele_array(dbp, numbufs, FTAG);
1041	}
1042	return (err);
1043}
1044
1045int
1046dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1047    void *buf, uint32_t flags)
1048{
1049	dnode_t *dn;
1050	int err;
1051
1052	err = dnode_hold(os, object, FTAG, &dn);
1053	if (err != 0)
1054		return (err);
1055
1056	err = dmu_read_impl(dn, offset, size, buf, flags);
1057	dnode_rele(dn, FTAG);
1058	return (err);
1059}
1060
1061int
1062dmu_read_by_dnode(dnode_t *dn, uint64_t offset, uint64_t size, void *buf,
1063    uint32_t flags)
1064{
1065	return (dmu_read_impl(dn, offset, size, buf, flags));
1066}
1067
1068static void
1069dmu_write_impl(dmu_buf_t **dbp, int numbufs, uint64_t offset, uint64_t size,
1070    const void *buf, dmu_tx_t *tx)
1071{
1072	int i;
1073
1074	for (i = 0; i < numbufs; i++) {
1075		int tocpy;
1076		int bufoff;
1077		dmu_buf_t *db = dbp[i];
1078
1079		ASSERT(size > 0);
1080
1081		bufoff = offset - db->db_offset;
1082		tocpy = (int)MIN(db->db_size - bufoff, size);
1083
1084		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1085
1086		if (tocpy == db->db_size)
1087			dmu_buf_will_fill(db, tx);
1088		else
1089			dmu_buf_will_dirty(db, tx);
1090
1091		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
1092
1093		if (tocpy == db->db_size)
1094			dmu_buf_fill_done(db, tx);
1095
1096		offset += tocpy;
1097		size -= tocpy;
1098		buf = (char *)buf + tocpy;
1099	}
1100}
1101
1102void
1103dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1104    const void *buf, dmu_tx_t *tx)
1105{
1106	dmu_buf_t **dbp;
1107	int numbufs;
1108
1109	if (size == 0)
1110		return;
1111
1112	VERIFY0(dmu_buf_hold_array(os, object, offset, size,
1113	    FALSE, FTAG, &numbufs, &dbp));
1114	dmu_write_impl(dbp, numbufs, offset, size, buf, tx);
1115	dmu_buf_rele_array(dbp, numbufs, FTAG);
1116}
1117
1118void
1119dmu_write_by_dnode(dnode_t *dn, uint64_t offset, uint64_t size,
1120    const void *buf, dmu_tx_t *tx)
1121{
1122	dmu_buf_t **dbp;
1123	int numbufs;
1124
1125	if (size == 0)
1126		return;
1127
1128	VERIFY0(dmu_buf_hold_array_by_dnode(dn, offset, size,
1129	    FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH));
1130	dmu_write_impl(dbp, numbufs, offset, size, buf, tx);
1131	dmu_buf_rele_array(dbp, numbufs, FTAG);
1132}
1133
1134static int
1135dmu_object_remap_one_indirect(objset_t *os, dnode_t *dn,
1136    uint64_t last_removal_txg, uint64_t offset)
1137{
1138	uint64_t l1blkid = dbuf_whichblock(dn, 1, offset);
1139	int err = 0;
1140
1141	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1142	dmu_buf_impl_t *dbuf = dbuf_hold_level(dn, 1, l1blkid, FTAG);
1143	ASSERT3P(dbuf, !=, NULL);
1144
1145	/*
1146	 * If the block hasn't been written yet, this default will ensure
1147	 * we don't try to remap it.
1148	 */
1149	uint64_t birth = UINT64_MAX;
1150	ASSERT3U(last_removal_txg, !=, UINT64_MAX);
1151	if (dbuf->db_blkptr != NULL)
1152		birth = dbuf->db_blkptr->blk_birth;
1153	rw_exit(&dn->dn_struct_rwlock);
1154
1155	/*
1156	 * If this L1 was already written after the last removal, then we've
1157	 * already tried to remap it.
1158	 */
1159	if (birth <= last_removal_txg &&
1160	    dbuf_read(dbuf, NULL, DB_RF_MUST_SUCCEED) == 0 &&
1161	    dbuf_can_remap(dbuf)) {
1162		dmu_tx_t *tx = dmu_tx_create(os);
1163		dmu_tx_hold_remap_l1indirect(tx, dn->dn_object);
1164		err = dmu_tx_assign(tx, TXG_WAIT);
1165		if (err == 0) {
1166			(void) dbuf_dirty(dbuf, tx);
1167			dmu_tx_commit(tx);
1168		} else {
1169			dmu_tx_abort(tx);
1170		}
1171	}
1172
1173	dbuf_rele(dbuf, FTAG);
1174
1175	delay(zfs_object_remap_one_indirect_delay_ticks);
1176
1177	return (err);
1178}
1179
1180/*
1181 * Remap all blockpointers in the object, if possible, so that they reference
1182 * only concrete vdevs.
1183 *
1184 * To do this, iterate over the L0 blockpointers and remap any that reference
1185 * an indirect vdev. Note that we only examine L0 blockpointers; since we
1186 * cannot guarantee that we can remap all blockpointer anyways (due to split
1187 * blocks), we do not want to make the code unnecessarily complicated to
1188 * catch the unlikely case that there is an L1 block on an indirect vdev that
1189 * contains no indirect blockpointers.
1190 */
1191int
1192dmu_object_remap_indirects(objset_t *os, uint64_t object,
1193    uint64_t last_removal_txg)
1194{
1195	uint64_t offset, l1span;
1196	int err;
1197	dnode_t *dn;
1198
1199	err = dnode_hold(os, object, FTAG, &dn);
1200	if (err != 0) {
1201		return (err);
1202	}
1203
1204	if (dn->dn_nlevels <= 1) {
1205		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1206			err = SET_ERROR(EINTR);
1207		}
1208
1209		/*
1210		 * If the dnode has no indirect blocks, we cannot dirty them.
1211		 * We still want to remap the blkptr(s) in the dnode if
1212		 * appropriate, so mark it as dirty.
1213		 */
1214		if (err == 0 && dnode_needs_remap(dn)) {
1215			dmu_tx_t *tx = dmu_tx_create(os);
1216			dmu_tx_hold_bonus(tx, dn->dn_object);
1217			if ((err = dmu_tx_assign(tx, TXG_WAIT)) == 0) {
1218				dnode_setdirty(dn, tx);
1219				dmu_tx_commit(tx);
1220			} else {
1221				dmu_tx_abort(tx);
1222			}
1223		}
1224
1225		dnode_rele(dn, FTAG);
1226		return (err);
1227	}
1228
1229	offset = 0;
1230	l1span = 1ULL << (dn->dn_indblkshift - SPA_BLKPTRSHIFT +
1231	    dn->dn_datablkshift);
1232	/*
1233	 * Find the next L1 indirect that is not a hole.
1234	 */
1235	while (dnode_next_offset(dn, 0, &offset, 2, 1, 0) == 0) {
1236		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1237			err = SET_ERROR(EINTR);
1238			break;
1239		}
1240		if ((err = dmu_object_remap_one_indirect(os, dn,
1241		    last_removal_txg, offset)) != 0) {
1242			break;
1243		}
1244		offset += l1span;
1245	}
1246
1247	dnode_rele(dn, FTAG);
1248	return (err);
1249}
1250
1251void
1252dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1253    dmu_tx_t *tx)
1254{
1255	dmu_buf_t **dbp;
1256	int numbufs, i;
1257
1258	if (size == 0)
1259		return;
1260
1261	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
1262	    FALSE, FTAG, &numbufs, &dbp));
1263
1264	for (i = 0; i < numbufs; i++) {
1265		dmu_buf_t *db = dbp[i];
1266
1267		dmu_buf_will_not_fill(db, tx);
1268	}
1269	dmu_buf_rele_array(dbp, numbufs, FTAG);
1270}
1271
1272void
1273dmu_write_embedded(objset_t *os, uint64_t object, uint64_t offset,
1274    void *data, uint8_t etype, uint8_t comp, int uncompressed_size,
1275    int compressed_size, int byteorder, dmu_tx_t *tx)
1276{
1277	dmu_buf_t *db;
1278
1279	ASSERT3U(etype, <, NUM_BP_EMBEDDED_TYPES);
1280	ASSERT3U(comp, <, ZIO_COMPRESS_FUNCTIONS);
1281	VERIFY0(dmu_buf_hold_noread(os, object, offset,
1282	    FTAG, &db));
1283
1284	dmu_buf_write_embedded(db,
1285	    data, (bp_embedded_type_t)etype, (enum zio_compress)comp,
1286	    uncompressed_size, compressed_size, byteorder, tx);
1287
1288	dmu_buf_rele(db, FTAG);
1289}
1290
1291/*
1292 * DMU support for xuio
1293 */
1294kstat_t *xuio_ksp = NULL;
1295
1296int
1297dmu_xuio_init(xuio_t *xuio, int nblk)
1298{
1299	dmu_xuio_t *priv;
1300	uio_t *uio = &xuio->xu_uio;
1301
1302	uio->uio_iovcnt = nblk;
1303	uio->uio_iov = kmem_zalloc(nblk * sizeof (iovec_t), KM_SLEEP);
1304
1305	priv = kmem_zalloc(sizeof (dmu_xuio_t), KM_SLEEP);
1306	priv->cnt = nblk;
1307	priv->bufs = kmem_zalloc(nblk * sizeof (arc_buf_t *), KM_SLEEP);
1308	priv->iovp = uio->uio_iov;
1309	XUIO_XUZC_PRIV(xuio) = priv;
1310
1311	if (XUIO_XUZC_RW(xuio) == UIO_READ)
1312		XUIOSTAT_INCR(xuiostat_onloan_rbuf, nblk);
1313	else
1314		XUIOSTAT_INCR(xuiostat_onloan_wbuf, nblk);
1315
1316	return (0);
1317}
1318
1319void
1320dmu_xuio_fini(xuio_t *xuio)
1321{
1322	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1323	int nblk = priv->cnt;
1324
1325	kmem_free(priv->iovp, nblk * sizeof (iovec_t));
1326	kmem_free(priv->bufs, nblk * sizeof (arc_buf_t *));
1327	kmem_free(priv, sizeof (dmu_xuio_t));
1328
1329	if (XUIO_XUZC_RW(xuio) == UIO_READ)
1330		XUIOSTAT_INCR(xuiostat_onloan_rbuf, -nblk);
1331	else
1332		XUIOSTAT_INCR(xuiostat_onloan_wbuf, -nblk);
1333}
1334
1335/*
1336 * Initialize iov[priv->next] and priv->bufs[priv->next] with { off, n, abuf }
1337 * and increase priv->next by 1.
1338 */
1339int
1340dmu_xuio_add(xuio_t *xuio, arc_buf_t *abuf, offset_t off, size_t n)
1341{
1342	struct iovec *iov;
1343	uio_t *uio = &xuio->xu_uio;
1344	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1345	int i = priv->next++;
1346
1347	ASSERT(i < priv->cnt);
1348	ASSERT(off + n <= arc_buf_lsize(abuf));
1349	iov = uio->uio_iov + i;
1350	iov->iov_base = (char *)abuf->b_data + off;
1351	iov->iov_len = n;
1352	priv->bufs[i] = abuf;
1353	return (0);
1354}
1355
1356int
1357dmu_xuio_cnt(xuio_t *xuio)
1358{
1359	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1360	return (priv->cnt);
1361}
1362
1363arc_buf_t *
1364dmu_xuio_arcbuf(xuio_t *xuio, int i)
1365{
1366	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1367
1368	ASSERT(i < priv->cnt);
1369	return (priv->bufs[i]);
1370}
1371
1372void
1373dmu_xuio_clear(xuio_t *xuio, int i)
1374{
1375	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
1376
1377	ASSERT(i < priv->cnt);
1378	priv->bufs[i] = NULL;
1379}
1380
1381static void
1382xuio_stat_init(void)
1383{
1384	xuio_ksp = kstat_create("zfs", 0, "xuio_stats", "misc",
1385	    KSTAT_TYPE_NAMED, sizeof (xuio_stats) / sizeof (kstat_named_t),
1386	    KSTAT_FLAG_VIRTUAL);
1387	if (xuio_ksp != NULL) {
1388		xuio_ksp->ks_data = &xuio_stats;
1389		kstat_install(xuio_ksp);
1390	}
1391}
1392
1393static void
1394xuio_stat_fini(void)
1395{
1396	if (xuio_ksp != NULL) {
1397		kstat_delete(xuio_ksp);
1398		xuio_ksp = NULL;
1399	}
1400}
1401
1402void
1403xuio_stat_wbuf_copied(void)
1404{
1405	XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1406}
1407
1408void
1409xuio_stat_wbuf_nocopy(void)
1410{
1411	XUIOSTAT_BUMP(xuiostat_wbuf_nocopy);
1412}
1413
1414#ifdef _KERNEL
1415int
1416dmu_read_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size)
1417{
1418	dmu_buf_t **dbp;
1419	int numbufs, i, err;
1420	xuio_t *xuio = NULL;
1421
1422	/*
1423	 * NB: we could do this block-at-a-time, but it's nice
1424	 * to be reading in parallel.
1425	 */
1426	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1427	    TRUE, FTAG, &numbufs, &dbp, 0);
1428	if (err)
1429		return (err);
1430
1431	if (uio->uio_extflg == UIO_XUIO)
1432		xuio = (xuio_t *)uio;
1433
1434	for (i = 0; i < numbufs; i++) {
1435		int tocpy;
1436		int bufoff;
1437		dmu_buf_t *db = dbp[i];
1438
1439		ASSERT(size > 0);
1440
1441		bufoff = uio->uio_loffset - db->db_offset;
1442		tocpy = (int)MIN(db->db_size - bufoff, size);
1443
1444		if (xuio) {
1445			dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
1446			arc_buf_t *dbuf_abuf = dbi->db_buf;
1447			arc_buf_t *abuf = dbuf_loan_arcbuf(dbi);
1448			err = dmu_xuio_add(xuio, abuf, bufoff, tocpy);
1449			if (!err) {
1450				uio->uio_resid -= tocpy;
1451				uio->uio_loffset += tocpy;
1452			}
1453
1454			if (abuf == dbuf_abuf)
1455				XUIOSTAT_BUMP(xuiostat_rbuf_nocopy);
1456			else
1457				XUIOSTAT_BUMP(xuiostat_rbuf_copied);
1458		} else {
1459			err = uiomove((char *)db->db_data + bufoff, tocpy,
1460			    UIO_READ, uio);
1461		}
1462		if (err)
1463			break;
1464
1465		size -= tocpy;
1466	}
1467	dmu_buf_rele_array(dbp, numbufs, FTAG);
1468
1469	return (err);
1470}
1471
1472/*
1473 * Read 'size' bytes into the uio buffer.
1474 * From object zdb->db_object.
1475 * Starting at offset uio->uio_loffset.
1476 *
1477 * If the caller already has a dbuf in the target object
1478 * (e.g. its bonus buffer), this routine is faster than dmu_read_uio(),
1479 * because we don't have to find the dnode_t for the object.
1480 */
1481int
1482dmu_read_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size)
1483{
1484	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1485	dnode_t *dn;
1486	int err;
1487
1488	if (size == 0)
1489		return (0);
1490
1491	DB_DNODE_ENTER(db);
1492	dn = DB_DNODE(db);
1493	err = dmu_read_uio_dnode(dn, uio, size);
1494	DB_DNODE_EXIT(db);
1495
1496	return (err);
1497}
1498
1499/*
1500 * Read 'size' bytes into the uio buffer.
1501 * From the specified object
1502 * Starting at offset uio->uio_loffset.
1503 */
1504int
1505dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
1506{
1507	dnode_t *dn;
1508	int err;
1509
1510	if (size == 0)
1511		return (0);
1512
1513	err = dnode_hold(os, object, FTAG, &dn);
1514	if (err)
1515		return (err);
1516
1517	err = dmu_read_uio_dnode(dn, uio, size);
1518
1519	dnode_rele(dn, FTAG);
1520
1521	return (err);
1522}
1523
1524int
1525dmu_write_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size, dmu_tx_t *tx)
1526{
1527	dmu_buf_t **dbp;
1528	int numbufs;
1529	int err = 0;
1530	int i;
1531
1532	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
1533	    FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH);
1534	if (err)
1535		return (err);
1536
1537	for (i = 0; i < numbufs; i++) {
1538		int tocpy;
1539		int bufoff;
1540		dmu_buf_t *db = dbp[i];
1541
1542		ASSERT(size > 0);
1543
1544		bufoff = uio->uio_loffset - db->db_offset;
1545		tocpy = (int)MIN(db->db_size - bufoff, size);
1546
1547		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1548
1549		if (tocpy == db->db_size)
1550			dmu_buf_will_fill(db, tx);
1551		else
1552			dmu_buf_will_dirty(db, tx);
1553
1554		/*
1555		 * XXX uiomove could block forever (eg. nfs-backed
1556		 * pages).  There needs to be a uiolockdown() function
1557		 * to lock the pages in memory, so that uiomove won't
1558		 * block.
1559		 */
1560		err = uiomove((char *)db->db_data + bufoff, tocpy,
1561		    UIO_WRITE, uio);
1562
1563		if (tocpy == db->db_size)
1564			dmu_buf_fill_done(db, tx);
1565
1566		if (err)
1567			break;
1568
1569		size -= tocpy;
1570	}
1571
1572	dmu_buf_rele_array(dbp, numbufs, FTAG);
1573	return (err);
1574}
1575
1576/*
1577 * Write 'size' bytes from the uio buffer.
1578 * To object zdb->db_object.
1579 * Starting at offset uio->uio_loffset.
1580 *
1581 * If the caller already has a dbuf in the target object
1582 * (e.g. its bonus buffer), this routine is faster than dmu_write_uio(),
1583 * because we don't have to find the dnode_t for the object.
1584 */
1585int
1586dmu_write_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size,
1587    dmu_tx_t *tx)
1588{
1589	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zdb;
1590	dnode_t *dn;
1591	int err;
1592
1593	if (size == 0)
1594		return (0);
1595
1596	DB_DNODE_ENTER(db);
1597	dn = DB_DNODE(db);
1598	err = dmu_write_uio_dnode(dn, uio, size, tx);
1599	DB_DNODE_EXIT(db);
1600
1601	return (err);
1602}
1603
1604/*
1605 * Write 'size' bytes from the uio buffer.
1606 * To the specified object.
1607 * Starting at offset uio->uio_loffset.
1608 */
1609int
1610dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
1611    dmu_tx_t *tx)
1612{
1613	dnode_t *dn;
1614	int err;
1615
1616	if (size == 0)
1617		return (0);
1618
1619	err = dnode_hold(os, object, FTAG, &dn);
1620	if (err)
1621		return (err);
1622
1623	err = dmu_write_uio_dnode(dn, uio, size, tx);
1624
1625	dnode_rele(dn, FTAG);
1626
1627	return (err);
1628}
1629
1630int
1631dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1632    page_t *pp, dmu_tx_t *tx)
1633{
1634	dmu_buf_t **dbp;
1635	int numbufs, i;
1636	int err;
1637
1638	if (size == 0)
1639		return (0);
1640
1641	err = dmu_buf_hold_array(os, object, offset, size,
1642	    FALSE, FTAG, &numbufs, &dbp);
1643	if (err)
1644		return (err);
1645
1646	for (i = 0; i < numbufs; i++) {
1647		int tocpy, copied, thiscpy;
1648		int bufoff;
1649		dmu_buf_t *db = dbp[i];
1650		caddr_t va;
1651
1652		ASSERT(size > 0);
1653		ASSERT3U(db->db_size, >=, PAGESIZE);
1654
1655		bufoff = offset - db->db_offset;
1656		tocpy = (int)MIN(db->db_size - bufoff, size);
1657
1658		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1659
1660		if (tocpy == db->db_size)
1661			dmu_buf_will_fill(db, tx);
1662		else
1663			dmu_buf_will_dirty(db, tx);
1664
1665		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1666			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
1667			thiscpy = MIN(PAGESIZE, tocpy - copied);
1668			va = zfs_map_page(pp, S_READ);
1669			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1670			zfs_unmap_page(pp, va);
1671			pp = pp->p_next;
1672			bufoff += PAGESIZE;
1673		}
1674
1675		if (tocpy == db->db_size)
1676			dmu_buf_fill_done(db, tx);
1677
1678		offset += tocpy;
1679		size -= tocpy;
1680	}
1681	dmu_buf_rele_array(dbp, numbufs, FTAG);
1682	return (err);
1683}
1684#endif
1685
1686/*
1687 * Allocate a loaned anonymous arc buffer.
1688 */
1689arc_buf_t *
1690dmu_request_arcbuf(dmu_buf_t *handle, int size)
1691{
1692	dmu_buf_impl_t *db = (dmu_buf_impl_t *)handle;
1693
1694	return (arc_loan_buf(db->db_objset->os_spa, B_FALSE, size));
1695}
1696
1697/*
1698 * Free a loaned arc buffer.
1699 */
1700void
1701dmu_return_arcbuf(arc_buf_t *buf)
1702{
1703	arc_return_buf(buf, FTAG);
1704	arc_buf_destroy(buf, FTAG);
1705}
1706
1707void
1708dmu_copy_from_buf(objset_t *os, uint64_t object, uint64_t offset,
1709    dmu_buf_t *handle, dmu_tx_t *tx)
1710{
1711	dmu_buf_t *dst_handle;
1712	dmu_buf_impl_t *dstdb;
1713	dmu_buf_impl_t *srcdb = (dmu_buf_impl_t *)handle;
1714	dmu_object_type_t type;
1715	arc_buf_t *abuf;
1716	uint64_t datalen;
1717	boolean_t byteorder;
1718	uint8_t salt[ZIO_DATA_SALT_LEN];
1719	uint8_t iv[ZIO_DATA_IV_LEN];
1720	uint8_t mac[ZIO_DATA_MAC_LEN];
1721
1722	ASSERT3P(srcdb->db_buf, !=, NULL);
1723
1724	/* hold the db that we want to write to */
1725	VERIFY0(dmu_buf_hold(os, object, offset, FTAG, &dst_handle,
1726	    DMU_READ_NO_DECRYPT));
1727	dstdb = (dmu_buf_impl_t *)dst_handle;
1728	datalen = arc_buf_size(srcdb->db_buf);
1729
1730	DB_DNODE_ENTER(dstdb);
1731	type = DB_DNODE(dstdb)->dn_type;
1732	DB_DNODE_EXIT(dstdb);
1733
1734	/* allocated an arc buffer that matches the type of srcdb->db_buf */
1735	if (arc_is_encrypted(srcdb->db_buf)) {
1736		arc_get_raw_params(srcdb->db_buf, &byteorder, salt, iv, mac);
1737		abuf = arc_loan_raw_buf(os->os_spa, dmu_objset_id(os),
1738		    byteorder, salt, iv, mac, type,
1739		    datalen, arc_buf_lsize(srcdb->db_buf),
1740		    arc_get_compression(srcdb->db_buf));
1741	} else {
1742		/* we won't get a compressed db back from dmu_buf_hold() */
1743		ASSERT3U(arc_get_compression(srcdb->db_buf),
1744		    ==, ZIO_COMPRESS_OFF);
1745		abuf = arc_loan_buf(os->os_spa,
1746		    DMU_OT_IS_METADATA(type), datalen);
1747	}
1748
1749	ASSERT3U(datalen, ==, arc_buf_size(abuf));
1750
1751	/* copy the data to the new buffer and assign it to the dstdb */
1752	bcopy(srcdb->db_buf->b_data, abuf->b_data, datalen);
1753	dbuf_assign_arcbuf(dstdb, abuf, tx);
1754	dmu_buf_rele(dst_handle, FTAG);
1755}
1756
1757/*
1758 * When possible directly assign passed loaned arc buffer to a dbuf.
1759 * If this is not possible copy the contents of passed arc buf via
1760 * dmu_write().
1761 */
1762int
1763dmu_assign_arcbuf_by_dnode(dnode_t *dn, uint64_t offset, arc_buf_t *buf,
1764    dmu_tx_t *tx)
1765{
1766	dmu_buf_impl_t *db;
1767	objset_t *os = dn->dn_objset;
1768	uint64_t object = dn->dn_object;
1769	uint32_t blksz = (uint32_t)arc_buf_lsize(buf);
1770	uint64_t blkid;
1771
1772	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1773	blkid = dbuf_whichblock(dn, 0, offset);
1774	db = dbuf_hold(dn, blkid, FTAG);
1775	if (db == NULL)
1776		return (SET_ERROR(EIO));
1777	rw_exit(&dn->dn_struct_rwlock);
1778
1779	/*
1780	 * We can only assign if the offset is aligned, the arc buf is the
1781	 * same size as the dbuf, and the dbuf is not metadata.
1782	 */
1783	if (offset == db->db.db_offset && blksz == db->db.db_size) {
1784		dbuf_assign_arcbuf(db, buf, tx);
1785		dbuf_rele(db, FTAG);
1786	} else {
1787		/* compressed bufs must always be assignable to their dbuf */
1788		ASSERT3U(arc_get_compression(buf), ==, ZIO_COMPRESS_OFF);
1789		ASSERT(!(buf->b_flags & ARC_BUF_FLAG_COMPRESSED));
1790
1791		os = dn->dn_objset;
1792		object = dn->dn_object;
1793		dbuf_rele(db, FTAG);
1794		dmu_write(os, object, offset, blksz, buf->b_data, tx);
1795		dmu_return_arcbuf(buf);
1796		XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1797	}
1798
1799	return (0);
1800}
1801
1802int
1803dmu_assign_arcbuf_by_dbuf(dmu_buf_t *handle, uint64_t offset, arc_buf_t *buf,
1804    dmu_tx_t *tx)
1805{
1806	int err;
1807	dmu_buf_impl_t *dbuf = (dmu_buf_impl_t *)handle;
1808
1809	DB_DNODE_ENTER(dbuf);
1810	err = dmu_assign_arcbuf_by_dnode(DB_DNODE(dbuf), offset, buf, tx);
1811	DB_DNODE_EXIT(dbuf);
1812
1813	return (err);
1814}
1815
1816typedef struct {
1817	dbuf_dirty_record_t	*dsa_dr;
1818	dmu_sync_cb_t		*dsa_done;
1819	zgd_t			*dsa_zgd;
1820	dmu_tx_t		*dsa_tx;
1821} dmu_sync_arg_t;
1822
1823/* ARGSUSED */
1824static void
1825dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
1826{
1827	dmu_sync_arg_t *dsa = varg;
1828	dmu_buf_t *db = dsa->dsa_zgd->zgd_db;
1829	blkptr_t *bp = zio->io_bp;
1830
1831	if (zio->io_error == 0) {
1832		if (BP_IS_HOLE(bp)) {
1833			/*
1834			 * A block of zeros may compress to a hole, but the
1835			 * block size still needs to be known for replay.
1836			 */
1837			BP_SET_LSIZE(bp, db->db_size);
1838		} else if (!BP_IS_EMBEDDED(bp)) {
1839			ASSERT(BP_GET_LEVEL(bp) == 0);
1840			BP_SET_FILL(bp, 1);
1841		}
1842	}
1843}
1844
1845static void
1846dmu_sync_late_arrival_ready(zio_t *zio)
1847{
1848	dmu_sync_ready(zio, NULL, zio->io_private);
1849}
1850
1851/* ARGSUSED */
1852static void
1853dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
1854{
1855	dmu_sync_arg_t *dsa = varg;
1856	dbuf_dirty_record_t *dr = dsa->dsa_dr;
1857	dmu_buf_impl_t *db = dr->dr_dbuf;
1858	zgd_t *zgd = dsa->dsa_zgd;
1859
1860	/*
1861	 * Record the vdev(s) backing this blkptr so they can be flushed after
1862	 * the writes for the lwb have completed.
1863	 */
1864	if (zio->io_error == 0) {
1865		zil_lwb_add_block(zgd->zgd_lwb, zgd->zgd_bp);
1866	}
1867
1868	mutex_enter(&db->db_mtx);
1869	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
1870	if (zio->io_error == 0) {
1871		dr->dt.dl.dr_nopwrite = !!(zio->io_flags & ZIO_FLAG_NOPWRITE);
1872		if (dr->dt.dl.dr_nopwrite) {
1873			blkptr_t *bp = zio->io_bp;
1874			blkptr_t *bp_orig = &zio->io_bp_orig;
1875			uint8_t chksum = BP_GET_CHECKSUM(bp_orig);
1876
1877			ASSERT(BP_EQUAL(bp, bp_orig));
1878			VERIFY(BP_EQUAL(bp, db->db_blkptr));
1879			ASSERT(zio->io_prop.zp_compress != ZIO_COMPRESS_OFF);
1880			ASSERT(zio_checksum_table[chksum].ci_flags &
1881			    ZCHECKSUM_FLAG_NOPWRITE);
1882		}
1883		dr->dt.dl.dr_overridden_by = *zio->io_bp;
1884		dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
1885		dr->dt.dl.dr_copies = zio->io_prop.zp_copies;
1886
1887		/*
1888		 * Old style holes are filled with all zeros, whereas
1889		 * new-style holes maintain their lsize, type, level,
1890		 * and birth time (see zio_write_compress). While we
1891		 * need to reset the BP_SET_LSIZE() call that happened
1892		 * in dmu_sync_ready for old style holes, we do *not*
1893		 * want to wipe out the information contained in new
1894		 * style holes. Thus, only zero out the block pointer if
1895		 * it's an old style hole.
1896		 */
1897		if (BP_IS_HOLE(&dr->dt.dl.dr_overridden_by) &&
1898		    dr->dt.dl.dr_overridden_by.blk_birth == 0)
1899			BP_ZERO(&dr->dt.dl.dr_overridden_by);
1900	} else {
1901		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
1902	}
1903	cv_broadcast(&db->db_changed);
1904	mutex_exit(&db->db_mtx);
1905
1906	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1907
1908	kmem_free(dsa, sizeof (*dsa));
1909}
1910
1911static void
1912dmu_sync_late_arrival_done(zio_t *zio)
1913{
1914	blkptr_t *bp = zio->io_bp;
1915	dmu_sync_arg_t *dsa = zio->io_private;
1916	blkptr_t *bp_orig = &zio->io_bp_orig;
1917	zgd_t *zgd = dsa->dsa_zgd;
1918
1919	if (zio->io_error == 0) {
1920		/*
1921		 * Record the vdev(s) backing this blkptr so they can be
1922		 * flushed after the writes for the lwb have completed.
1923		 */
1924		zil_lwb_add_block(zgd->zgd_lwb, zgd->zgd_bp);
1925
1926		if (!BP_IS_HOLE(bp)) {
1927			ASSERT(!(zio->io_flags & ZIO_FLAG_NOPWRITE));
1928			ASSERT(BP_IS_HOLE(bp_orig) || !BP_EQUAL(bp, bp_orig));
1929			ASSERT(zio->io_bp->blk_birth == zio->io_txg);
1930			ASSERT(zio->io_txg > spa_syncing_txg(zio->io_spa));
1931			zio_free(zio->io_spa, zio->io_txg, zio->io_bp);
1932		}
1933	}
1934
1935	dmu_tx_commit(dsa->dsa_tx);
1936
1937	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1938
1939	abd_put(zio->io_abd);
1940	kmem_free(dsa, sizeof (*dsa));
1941}
1942
1943static int
1944dmu_sync_late_arrival(zio_t *pio, objset_t *os, dmu_sync_cb_t *done, zgd_t *zgd,
1945    zio_prop_t *zp, zbookmark_phys_t *zb)
1946{
1947	dmu_sync_arg_t *dsa;
1948	dmu_tx_t *tx;
1949
1950	tx = dmu_tx_create(os);
1951	dmu_tx_hold_space(tx, zgd->zgd_db->db_size);
1952	if (dmu_tx_assign(tx, TXG_WAIT) != 0) {
1953		dmu_tx_abort(tx);
1954		/* Make zl_get_data do txg_waited_synced() */
1955		return (SET_ERROR(EIO));
1956	}
1957
1958	/*
1959	 * In order to prevent the zgd's lwb from being free'd prior to
1960	 * dmu_sync_late_arrival_done() being called, we have to ensure
1961	 * the lwb's "max txg" takes this tx's txg into account.
1962	 */
1963	zil_lwb_add_txg(zgd->zgd_lwb, dmu_tx_get_txg(tx));
1964
1965	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1966	dsa->dsa_dr = NULL;
1967	dsa->dsa_done = done;
1968	dsa->dsa_zgd = zgd;
1969	dsa->dsa_tx = tx;
1970
1971	/*
1972	 * Since we are currently syncing this txg, it's nontrivial to
1973	 * determine what BP to nopwrite against, so we disable nopwrite.
1974	 *
1975	 * When syncing, the db_blkptr is initially the BP of the previous
1976	 * txg.  We can not nopwrite against it because it will be changed
1977	 * (this is similar to the non-late-arrival case where the dbuf is
1978	 * dirty in a future txg).
1979	 *
1980	 * Then dbuf_write_ready() sets bp_blkptr to the location we will write.
1981	 * We can not nopwrite against it because although the BP will not
1982	 * (typically) be changed, the data has not yet been persisted to this
1983	 * location.
1984	 *
1985	 * Finally, when dbuf_write_done() is called, it is theoretically
1986	 * possible to always nopwrite, because the data that was written in
1987	 * this txg is the same data that we are trying to write.  However we
1988	 * would need to check that this dbuf is not dirty in any future
1989	 * txg's (as we do in the normal dmu_sync() path). For simplicity, we
1990	 * don't nopwrite in this case.
1991	 */
1992	zp->zp_nopwrite = B_FALSE;
1993
1994	zio_nowait(zio_write(pio, os->os_spa, dmu_tx_get_txg(tx), zgd->zgd_bp,
1995	    abd_get_from_buf(zgd->zgd_db->db_data, zgd->zgd_db->db_size),
1996	    zgd->zgd_db->db_size, zgd->zgd_db->db_size, zp,
1997	    dmu_sync_late_arrival_ready, NULL, NULL, dmu_sync_late_arrival_done,
1998	    dsa, ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, zb));
1999
2000	return (0);
2001}
2002
2003/*
2004 * Intent log support: sync the block associated with db to disk.
2005 * N.B. and XXX: the caller is responsible for making sure that the
2006 * data isn't changing while dmu_sync() is writing it.
2007 *
2008 * Return values:
2009 *
2010 *	EEXIST: this txg has already been synced, so there's nothing to do.
2011 *		The caller should not log the write.
2012 *
2013 *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
2014 *		The caller should not log the write.
2015 *
2016 *	EALREADY: this block is already in the process of being synced.
2017 *		The caller should track its progress (somehow).
2018 *
2019 *	EIO: could not do the I/O.
2020 *		The caller should do a txg_wait_synced().
2021 *
2022 *	0: the I/O has been initiated.
2023 *		The caller should log this blkptr in the done callback.
2024 *		It is possible that the I/O will fail, in which case
2025 *		the error will be reported to the done callback and
2026 *		propagated to pio from zio_done().
2027 */
2028int
2029dmu_sync(zio_t *pio, uint64_t txg, dmu_sync_cb_t *done, zgd_t *zgd)
2030{
2031	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zgd->zgd_db;
2032	objset_t *os = db->db_objset;
2033	dsl_dataset_t *ds = os->os_dsl_dataset;
2034	dbuf_dirty_record_t *dr;
2035	dmu_sync_arg_t *dsa;
2036	zbookmark_phys_t zb;
2037	zio_prop_t zp;
2038	dnode_t *dn;
2039
2040	ASSERT(pio != NULL);
2041	ASSERT(txg != 0);
2042
2043	SET_BOOKMARK(&zb, ds->ds_object,
2044	    db->db.db_object, db->db_level, db->db_blkid);
2045
2046	DB_DNODE_ENTER(db);
2047	dn = DB_DNODE(db);
2048	dmu_write_policy(os, dn, db->db_level, WP_DMU_SYNC, &zp);
2049	DB_DNODE_EXIT(db);
2050
2051	/*
2052	 * If we're frozen (running ziltest), we always need to generate a bp.
2053	 */
2054	if (txg > spa_freeze_txg(os->os_spa))
2055		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
2056
2057	/*
2058	 * Grabbing db_mtx now provides a barrier between dbuf_sync_leaf()
2059	 * and us.  If we determine that this txg is not yet syncing,
2060	 * but it begins to sync a moment later, that's OK because the
2061	 * sync thread will block in dbuf_sync_leaf() until we drop db_mtx.
2062	 */
2063	mutex_enter(&db->db_mtx);
2064
2065	if (txg <= spa_last_synced_txg(os->os_spa)) {
2066		/*
2067		 * This txg has already synced.  There's nothing to do.
2068		 */
2069		mutex_exit(&db->db_mtx);
2070		return (SET_ERROR(EEXIST));
2071	}
2072
2073	if (txg <= spa_syncing_txg(os->os_spa)) {
2074		/*
2075		 * This txg is currently syncing, so we can't mess with
2076		 * the dirty record anymore; just write a new log block.
2077		 */
2078		mutex_exit(&db->db_mtx);
2079		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
2080	}
2081
2082	dr = db->db_last_dirty;
2083	while (dr && dr->dr_txg != txg)
2084		dr = dr->dr_next;
2085
2086	if (dr == NULL) {
2087		/*
2088		 * There's no dr for this dbuf, so it must have been freed.
2089		 * There's no need to log writes to freed blocks, so we're done.
2090		 */
2091		mutex_exit(&db->db_mtx);
2092		return (SET_ERROR(ENOENT));
2093	}
2094
2095	ASSERT(dr->dr_next == NULL || dr->dr_next->dr_txg < txg);
2096
2097	if (db->db_blkptr != NULL) {
2098		/*
2099		 * We need to fill in zgd_bp with the current blkptr so that
2100		 * the nopwrite code can check if we're writing the same
2101		 * data that's already on disk.  We can only nopwrite if we
2102		 * are sure that after making the copy, db_blkptr will not
2103		 * change until our i/o completes.  We ensure this by
2104		 * holding the db_mtx, and only allowing nopwrite if the
2105		 * block is not already dirty (see below).  This is verified
2106		 * by dmu_sync_done(), which VERIFYs that the db_blkptr has
2107		 * not changed.
2108		 */
2109		*zgd->zgd_bp = *db->db_blkptr;
2110	}
2111
2112	/*
2113	 * Assume the on-disk data is X, the current syncing data (in
2114	 * txg - 1) is Y, and the current in-memory data is Z (currently
2115	 * in dmu_sync).
2116	 *
2117	 * We usually want to perform a nopwrite if X and Z are the
2118	 * same.  However, if Y is different (i.e. the BP is going to
2119	 * change before this write takes effect), then a nopwrite will
2120	 * be incorrect - we would override with X, which could have
2121	 * been freed when Y was written.
2122	 *
2123	 * (Note that this is not a concern when we are nop-writing from
2124	 * syncing context, because X and Y must be identical, because
2125	 * all previous txgs have been synced.)
2126	 *
2127	 * Therefore, we disable nopwrite if the current BP could change
2128	 * before this TXG.  There are two ways it could change: by
2129	 * being dirty (dr_next is non-NULL), or by being freed
2130	 * (dnode_block_freed()).  This behavior is verified by
2131	 * zio_done(), which VERIFYs that the override BP is identical
2132	 * to the on-disk BP.
2133	 */
2134	DB_DNODE_ENTER(db);
2135	dn = DB_DNODE(db);
2136	if (dr->dr_next != NULL || dnode_block_freed(dn, db->db_blkid))
2137		zp.zp_nopwrite = B_FALSE;
2138	DB_DNODE_EXIT(db);
2139
2140	ASSERT(dr->dr_txg == txg);
2141	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC ||
2142	    dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
2143		/*
2144		 * We have already issued a sync write for this buffer,
2145		 * or this buffer has already been synced.  It could not
2146		 * have been dirtied since, or we would have cleared the state.
2147		 */
2148		mutex_exit(&db->db_mtx);
2149		return (SET_ERROR(EALREADY));
2150	}
2151
2152	ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
2153	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
2154	mutex_exit(&db->db_mtx);
2155
2156	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
2157	dsa->dsa_dr = dr;
2158	dsa->dsa_done = done;
2159	dsa->dsa_zgd = zgd;
2160	dsa->dsa_tx = NULL;
2161
2162	zio_nowait(arc_write(pio, os->os_spa, txg,
2163	    zgd->zgd_bp, dr->dt.dl.dr_data, DBUF_IS_L2CACHEABLE(db),
2164	    &zp, dmu_sync_ready, NULL, NULL, dmu_sync_done, dsa,
2165	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, &zb));
2166
2167	return (0);
2168}
2169
2170int
2171dmu_object_set_nlevels(objset_t *os, uint64_t object, int nlevels, dmu_tx_t *tx)
2172{
2173	dnode_t *dn;
2174	int err;
2175
2176	err = dnode_hold(os, object, FTAG, &dn);
2177	if (err)
2178		return (err);
2179	err = dnode_set_nlevels(dn, nlevels, tx);
2180	dnode_rele(dn, FTAG);
2181	return (err);
2182}
2183
2184int
2185dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
2186    dmu_tx_t *tx)
2187{
2188	dnode_t *dn;
2189	int err;
2190
2191	err = dnode_hold(os, object, FTAG, &dn);
2192	if (err)
2193		return (err);
2194	err = dnode_set_blksz(dn, size, ibs, tx);
2195	dnode_rele(dn, FTAG);
2196	return (err);
2197}
2198
2199int
2200dmu_object_set_maxblkid(objset_t *os, uint64_t object, uint64_t maxblkid,
2201    dmu_tx_t *tx)
2202{
2203	dnode_t *dn;
2204	int err;
2205
2206	err = dnode_hold(os, object, FTAG, &dn);
2207	if (err)
2208		return (err);
2209	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
2210	dnode_new_blkid(dn, maxblkid, tx, B_FALSE, B_TRUE);
2211	rw_exit(&dn->dn_struct_rwlock);
2212	dnode_rele(dn, FTAG);
2213	return (0);
2214}
2215
2216void
2217dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
2218    dmu_tx_t *tx)
2219{
2220	dnode_t *dn;
2221
2222	/*
2223	 * Send streams include each object's checksum function.  This
2224	 * check ensures that the receiving system can understand the
2225	 * checksum function transmitted.
2226	 */
2227	ASSERT3U(checksum, <, ZIO_CHECKSUM_LEGACY_FUNCTIONS);
2228
2229	VERIFY0(dnode_hold(os, object, FTAG, &dn));
2230	ASSERT3U(checksum, <, ZIO_CHECKSUM_FUNCTIONS);
2231	dn->dn_checksum = checksum;
2232	dnode_setdirty(dn, tx);
2233	dnode_rele(dn, FTAG);
2234}
2235
2236void
2237dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
2238    dmu_tx_t *tx)
2239{
2240	dnode_t *dn;
2241
2242	/*
2243	 * Send streams include each object's compression function.  This
2244	 * check ensures that the receiving system can understand the
2245	 * compression function transmitted.
2246	 */
2247	ASSERT3U(compress, <, ZIO_COMPRESS_LEGACY_FUNCTIONS);
2248
2249	VERIFY0(dnode_hold(os, object, FTAG, &dn));
2250	dn->dn_compress = compress;
2251	dnode_setdirty(dn, tx);
2252	dnode_rele(dn, FTAG);
2253}
2254
2255/*
2256 * When the "redundant_metadata" property is set to "most", only indirect
2257 * blocks of this level and higher will have an additional ditto block.
2258 */
2259int zfs_redundant_metadata_most_ditto_level = 2;
2260
2261void
2262dmu_write_policy(objset_t *os, dnode_t *dn, int level, int wp, zio_prop_t *zp)
2263{
2264	dmu_object_type_t type = dn ? dn->dn_type : DMU_OT_OBJSET;
2265	boolean_t ismd = (level > 0 || DMU_OT_IS_METADATA(type) ||
2266	    (wp & WP_SPILL));
2267	enum zio_checksum checksum = os->os_checksum;
2268	enum zio_compress compress = os->os_compress;
2269	enum zio_checksum dedup_checksum = os->os_dedup_checksum;
2270	boolean_t dedup = B_FALSE;
2271	boolean_t nopwrite = B_FALSE;
2272	boolean_t dedup_verify = os->os_dedup_verify;
2273	boolean_t encrypt = B_FALSE;
2274	int copies = os->os_copies;
2275
2276	/*
2277	 * We maintain different write policies for each of the following
2278	 * types of data:
2279	 *	 1. metadata
2280	 *	 2. preallocated blocks (i.e. level-0 blocks of a dump device)
2281	 *	 3. all other level 0 blocks
2282	 */
2283	if (ismd) {
2284		/*
2285		 * XXX -- we should design a compression algorithm
2286		 * that specializes in arrays of bps.
2287		 */
2288		compress = zio_compress_select(os->os_spa,
2289		    ZIO_COMPRESS_ON, ZIO_COMPRESS_ON);
2290
2291		/*
2292		 * Metadata always gets checksummed.  If the data
2293		 * checksum is multi-bit correctable, and it's not a
2294		 * ZBT-style checksum, then it's suitable for metadata
2295		 * as well.  Otherwise, the metadata checksum defaults
2296		 * to fletcher4.
2297		 */
2298		if (!(zio_checksum_table[checksum].ci_flags &
2299		    ZCHECKSUM_FLAG_METADATA) ||
2300		    (zio_checksum_table[checksum].ci_flags &
2301		    ZCHECKSUM_FLAG_EMBEDDED))
2302			checksum = ZIO_CHECKSUM_FLETCHER_4;
2303
2304		if (os->os_redundant_metadata == ZFS_REDUNDANT_METADATA_ALL ||
2305		    (os->os_redundant_metadata ==
2306		    ZFS_REDUNDANT_METADATA_MOST &&
2307		    (level >= zfs_redundant_metadata_most_ditto_level ||
2308		    DMU_OT_IS_METADATA(type) || (wp & WP_SPILL))))
2309			copies++;
2310	} else if (wp & WP_NOFILL) {
2311		ASSERT(level == 0);
2312
2313		/*
2314		 * If we're writing preallocated blocks, we aren't actually
2315		 * writing them so don't set any policy properties.  These
2316		 * blocks are currently only used by an external subsystem
2317		 * outside of zfs (i.e. dump) and not written by the zio
2318		 * pipeline.
2319		 */
2320		compress = ZIO_COMPRESS_OFF;
2321		checksum = ZIO_CHECKSUM_NOPARITY;
2322	} else {
2323		compress = zio_compress_select(os->os_spa, dn->dn_compress,
2324		    compress);
2325
2326		checksum = (dedup_checksum == ZIO_CHECKSUM_OFF) ?
2327		    zio_checksum_select(dn->dn_checksum, checksum) :
2328		    dedup_checksum;
2329
2330		/*
2331		 * Determine dedup setting.  If we are in dmu_sync(),
2332		 * we won't actually dedup now because that's all
2333		 * done in syncing context; but we do want to use the
2334		 * dedup checkum.  If the checksum is not strong
2335		 * enough to ensure unique signatures, force
2336		 * dedup_verify.
2337		 */
2338		if (dedup_checksum != ZIO_CHECKSUM_OFF) {
2339			dedup = (wp & WP_DMU_SYNC) ? B_FALSE : B_TRUE;
2340			if (!(zio_checksum_table[checksum].ci_flags &
2341			    ZCHECKSUM_FLAG_DEDUP))
2342				dedup_verify = B_TRUE;
2343		}
2344
2345		/*
2346		 * Enable nopwrite if we have secure enough checksum
2347		 * algorithm (see comment in zio_nop_write) and
2348		 * compression is enabled.  We don't enable nopwrite if
2349		 * dedup is enabled as the two features are mutually
2350		 * exclusive.
2351		 */
2352		nopwrite = (!dedup && (zio_checksum_table[checksum].ci_flags &
2353		    ZCHECKSUM_FLAG_NOPWRITE) &&
2354		    compress != ZIO_COMPRESS_OFF && zfs_nopwrite_enabled);
2355	}
2356
2357	/*
2358	 * All objects in an encrypted objset are protected from modification
2359	 * via a MAC. Encrypted objects store their IV and salt in the last DVA
2360	 * in the bp, so we cannot use all copies. Encrypted objects are also
2361	 * not subject to nopwrite since writing the same data will still
2362	 * result in a new ciphertext. Only encrypted blocks can be dedup'd
2363	 * to avoid ambiguity in the dedup code since the DDT does not store
2364	 * object types.
2365	 */
2366	if (os->os_encrypted && (wp & WP_NOFILL) == 0) {
2367		encrypt = B_TRUE;
2368
2369		if (DMU_OT_IS_ENCRYPTED(type)) {
2370			copies = MIN(copies, SPA_DVAS_PER_BP - 1);
2371			nopwrite = B_FALSE;
2372		} else {
2373			dedup = B_FALSE;
2374		}
2375
2376		if (level <= 0 &&
2377		    (type == DMU_OT_DNODE || type == DMU_OT_OBJSET)) {
2378			compress = ZIO_COMPRESS_EMPTY;
2379		}
2380	}
2381
2382	zp->zp_compress = compress;
2383	zp->zp_checksum = checksum;
2384	zp->zp_type = (wp & WP_SPILL) ? dn->dn_bonustype : type;
2385	zp->zp_level = level;
2386	zp->zp_copies = MIN(copies, spa_max_replication(os->os_spa));
2387	zp->zp_dedup = dedup;
2388	zp->zp_dedup_verify = dedup && dedup_verify;
2389	zp->zp_nopwrite = nopwrite;
2390	zp->zp_zpl_smallblk = DMU_OT_IS_FILE(zp->zp_type) ?
2391	    os->os_zpl_special_smallblock : 0;
2392	zp->zp_encrypt = encrypt;
2393	zp->zp_byteorder = ZFS_HOST_BYTEORDER;
2394	bzero(zp->zp_salt, ZIO_DATA_SALT_LEN);
2395	bzero(zp->zp_iv, ZIO_DATA_IV_LEN);
2396	bzero(zp->zp_mac, ZIO_DATA_MAC_LEN);
2397}
2398
2399int
2400dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
2401{
2402	dnode_t *dn;
2403	int err;
2404
2405	/*
2406	 * Sync any current changes before
2407	 * we go trundling through the block pointers.
2408	 */
2409	err = dmu_object_wait_synced(os, object);
2410	if (err) {
2411		return (err);
2412	}
2413
2414	err = dnode_hold(os, object, FTAG, &dn);
2415	if (err) {
2416		return (err);
2417	}
2418
2419	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
2420	dnode_rele(dn, FTAG);
2421
2422	return (err);
2423}
2424
2425/*
2426 * Given the ZFS object, if it contains any dirty nodes
2427 * this function flushes all dirty blocks to disk. This
2428 * ensures the DMU object info is updated. A more efficient
2429 * future version might just find the TXG with the maximum
2430 * ID and wait for that to be synced.
2431 */
2432int
2433dmu_object_wait_synced(objset_t *os, uint64_t object)
2434{
2435	dnode_t *dn;
2436	int error, i;
2437
2438	error = dnode_hold(os, object, FTAG, &dn);
2439	if (error) {
2440		return (error);
2441	}
2442
2443	for (i = 0; i < TXG_SIZE; i++) {
2444		if (list_link_active(&dn->dn_dirty_link[i])) {
2445			break;
2446		}
2447	}
2448	dnode_rele(dn, FTAG);
2449	if (i != TXG_SIZE) {
2450		txg_wait_synced(dmu_objset_pool(os), 0);
2451	}
2452
2453	return (0);
2454}
2455
2456void
2457dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
2458{
2459	dnode_phys_t *dnp;
2460
2461	rw_enter(&dn->dn_struct_rwlock, RW_READER);
2462	mutex_enter(&dn->dn_mtx);
2463
2464	dnp = dn->dn_phys;
2465
2466	doi->doi_data_block_size = dn->dn_datablksz;
2467	doi->doi_metadata_block_size = dn->dn_indblkshift ?
2468	    1ULL << dn->dn_indblkshift : 0;
2469	doi->doi_type = dn->dn_type;
2470	doi->doi_bonus_type = dn->dn_bonustype;
2471	doi->doi_bonus_size = dn->dn_bonuslen;
2472	doi->doi_dnodesize = dn->dn_num_slots << DNODE_SHIFT;
2473	doi->doi_indirection = dn->dn_nlevels;
2474	doi->doi_checksum = dn->dn_checksum;
2475	doi->doi_compress = dn->dn_compress;
2476	doi->doi_nblkptr = dn->dn_nblkptr;
2477	doi->doi_physical_blocks_512 = (DN_USED_BYTES(dnp) + 256) >> 9;
2478	doi->doi_max_offset = (dn->dn_maxblkid + 1) * dn->dn_datablksz;
2479	doi->doi_fill_count = 0;
2480	for (int i = 0; i < dnp->dn_nblkptr; i++)
2481		doi->doi_fill_count += BP_GET_FILL(&dnp->dn_blkptr[i]);
2482
2483	mutex_exit(&dn->dn_mtx);
2484	rw_exit(&dn->dn_struct_rwlock);
2485}
2486
2487/*
2488 * Get information on a DMU object.
2489 * If doi is NULL, just indicates whether the object exists.
2490 */
2491int
2492dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
2493{
2494	dnode_t *dn;
2495	int err = dnode_hold(os, object, FTAG, &dn);
2496
2497	if (err)
2498		return (err);
2499
2500	if (doi != NULL)
2501		dmu_object_info_from_dnode(dn, doi);
2502
2503	dnode_rele(dn, FTAG);
2504	return (0);
2505}
2506
2507/*
2508 * As above, but faster; can be used when you have a held dbuf in hand.
2509 */
2510void
2511dmu_object_info_from_db(dmu_buf_t *db_fake, dmu_object_info_t *doi)
2512{
2513	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2514
2515	DB_DNODE_ENTER(db);
2516	dmu_object_info_from_dnode(DB_DNODE(db), doi);
2517	DB_DNODE_EXIT(db);
2518}
2519
2520/*
2521 * Faster still when you only care about the size.
2522 * This is specifically optimized for zfs_getattr().
2523 */
2524void
2525dmu_object_size_from_db(dmu_buf_t *db_fake, uint32_t *blksize,
2526    u_longlong_t *nblk512)
2527{
2528	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2529	dnode_t *dn;
2530
2531	DB_DNODE_ENTER(db);
2532	dn = DB_DNODE(db);
2533
2534	*blksize = dn->dn_datablksz;
2535	/* add in number of slots used for the dnode itself */
2536	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
2537	    SPA_MINBLOCKSHIFT) + dn->dn_num_slots;
2538	DB_DNODE_EXIT(db);
2539}
2540
2541void
2542dmu_object_dnsize_from_db(dmu_buf_t *db_fake, int *dnsize)
2543{
2544	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2545	dnode_t *dn;
2546
2547	DB_DNODE_ENTER(db);
2548	dn = DB_DNODE(db);
2549	*dnsize = dn->dn_num_slots << DNODE_SHIFT;
2550	DB_DNODE_EXIT(db);
2551}
2552
2553void
2554byteswap_uint64_array(void *vbuf, size_t size)
2555{
2556	uint64_t *buf = vbuf;
2557	size_t count = size >> 3;
2558	int i;
2559
2560	ASSERT((size & 7) == 0);
2561
2562	for (i = 0; i < count; i++)
2563		buf[i] = BSWAP_64(buf[i]);
2564}
2565
2566void
2567byteswap_uint32_array(void *vbuf, size_t size)
2568{
2569	uint32_t *buf = vbuf;
2570	size_t count = size >> 2;
2571	int i;
2572
2573	ASSERT((size & 3) == 0);
2574
2575	for (i = 0; i < count; i++)
2576		buf[i] = BSWAP_32(buf[i]);
2577}
2578
2579void
2580byteswap_uint16_array(void *vbuf, size_t size)
2581{
2582	uint16_t *buf = vbuf;
2583	size_t count = size >> 1;
2584	int i;
2585
2586	ASSERT((size & 1) == 0);
2587
2588	for (i = 0; i < count; i++)
2589		buf[i] = BSWAP_16(buf[i]);
2590}
2591
2592/* ARGSUSED */
2593void
2594byteswap_uint8_array(void *vbuf, size_t size)
2595{
2596}
2597
2598void
2599dmu_init(void)
2600{
2601	abd_init();
2602	zfs_dbgmsg_init();
2603	sa_cache_init();
2604	xuio_stat_init();
2605	dmu_objset_init();
2606	dnode_init();
2607	zfetch_init();
2608	l2arc_init();
2609	arc_init();
2610	dbuf_init();
2611}
2612
2613void
2614dmu_fini(void)
2615{
2616	arc_fini(); /* arc depends on l2arc, so arc must go first */
2617	l2arc_fini();
2618	zfetch_fini();
2619	dbuf_fini();
2620	dnode_fini();
2621	dmu_objset_fini();
2622	xuio_stat_fini();
2623	sa_cache_fini();
2624	zfs_dbgmsg_fini();
2625	abd_fini();
2626}
2627