dmu.c revision 06e0070d70ba2ee95f5aa2645423eb2cf1546788
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 */
24
25#include <sys/dmu.h>
26#include <sys/dmu_impl.h>
27#include <sys/dmu_tx.h>
28#include <sys/dbuf.h>
29#include <sys/dnode.h>
30#include <sys/zfs_context.h>
31#include <sys/dmu_objset.h>
32#include <sys/dmu_traverse.h>
33#include <sys/dsl_dataset.h>
34#include <sys/dsl_dir.h>
35#include <sys/dsl_pool.h>
36#include <sys/dsl_synctask.h>
37#include <sys/dsl_prop.h>
38#include <sys/dmu_zfetch.h>
39#include <sys/zfs_ioctl.h>
40#include <sys/zap.h>
41#include <sys/zio_checksum.h>
42#include <sys/sa.h>
43#ifdef _KERNEL
44#include <sys/vmsystm.h>
45#include <sys/zfs_znode.h>
46#endif
47
48const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
49	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
50	{	zap_byteswap,		TRUE,	"object directory"	},
51	{	byteswap_uint64_array,	TRUE,	"object array"		},
52	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
53	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
54	{	byteswap_uint64_array,	TRUE,	"bplist"		},
55	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
56	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
57	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
58	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
59	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
60	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
61	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
62	{	zap_byteswap,		TRUE,	"DSL directory child map"},
63	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
64	{	zap_byteswap,		TRUE,	"DSL props"		},
65	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
66	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
67	{	zfs_oldacl_byteswap,	TRUE,	"ZFS V0 ACL"		},
68	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
69	{	zap_byteswap,		TRUE,	"ZFS directory"		},
70	{	zap_byteswap,		TRUE,	"ZFS master node"	},
71	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
72	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
73	{	zap_byteswap,		TRUE,	"zvol prop"		},
74	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
75	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
76	{	zap_byteswap,		TRUE,	"other ZAP"		},
77	{	zap_byteswap,		TRUE,	"persistent error log"	},
78	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
79	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
80	{	zap_byteswap,		TRUE,	"Pool properties"	},
81	{	zap_byteswap,		TRUE,	"DSL permissions"	},
82	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
83	{	byteswap_uint8_array,	TRUE,	"ZFS SYSACL"		},
84	{	byteswap_uint8_array,	TRUE,	"FUID table"		},
85	{	byteswap_uint64_array,	TRUE,	"FUID table size"	},
86	{	zap_byteswap,		TRUE,	"DSL dataset next clones"},
87	{	zap_byteswap,		TRUE,	"scrub work queue"	},
88	{	zap_byteswap,		TRUE,	"ZFS user/group used"	},
89	{	zap_byteswap,		TRUE,	"ZFS user/group quota"	},
90	{	zap_byteswap,		TRUE,	"snapshot refcount tags"},
91	{	zap_byteswap,		TRUE,	"DDT ZAP algorithm"	},
92	{	zap_byteswap,		TRUE,	"DDT statistics"	},
93	{	byteswap_uint8_array,	TRUE,	"System attributes"	},
94	{	zap_byteswap,		TRUE,	"SA master node"	},
95	{	zap_byteswap,		TRUE,	"SA attr registration"	},
96	{	zap_byteswap,		TRUE,	"SA attr layouts"	}, };
97
98int
99dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
100    void *tag, dmu_buf_t **dbp)
101{
102	dnode_t *dn;
103	uint64_t blkid;
104	dmu_buf_impl_t *db;
105	int err;
106
107	err = dnode_hold(os, object, FTAG, &dn);
108	if (err)
109		return (err);
110	blkid = dbuf_whichblock(dn, offset);
111	rw_enter(&dn->dn_struct_rwlock, RW_READER);
112	db = dbuf_hold(dn, blkid, tag);
113	rw_exit(&dn->dn_struct_rwlock);
114	if (db == NULL) {
115		err = EIO;
116	} else {
117		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
118		if (err) {
119			dbuf_rele(db, tag);
120			db = NULL;
121		}
122	}
123
124	dnode_rele(dn, FTAG);
125	*dbp = &db->db;
126	return (err);
127}
128
129int
130dmu_bonus_max(void)
131{
132	return (DN_MAX_BONUSLEN);
133}
134
135int
136dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
137{
138	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
139
140	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
141		return (EINVAL);
142	if (newsize < 0 || newsize > db->db_size)
143		return (EINVAL);
144	dnode_setbonuslen(dn, newsize, tx);
145	return (0);
146}
147
148int
149dmu_set_bonustype(dmu_buf_t *db, dmu_object_type_t type, dmu_tx_t *tx)
150{
151	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
152
153	if (type > DMU_OT_NUMTYPES)
154		return (EINVAL);
155
156	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
157		return (EINVAL);
158
159	dnode_setbonus_type(dn, type, tx);
160	return (0);
161}
162
163int
164dmu_rm_spill(objset_t *os, uint64_t object, dmu_tx_t *tx)
165{
166	dnode_t *dn;
167	int error;
168
169	error = dnode_hold(os, object, FTAG, &dn);
170	dbuf_rm_spill(dn, tx);
171	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
172	dnode_rm_spill(dn, tx);
173	rw_exit(&dn->dn_struct_rwlock);
174	dnode_rele(dn, FTAG);
175	return (error);
176}
177
178/*
179 * returns ENOENT, EIO, or 0.
180 */
181int
182dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
183{
184	dnode_t *dn;
185	dmu_buf_impl_t *db;
186	int error;
187
188	error = dnode_hold(os, object, FTAG, &dn);
189	if (error)
190		return (error);
191
192	rw_enter(&dn->dn_struct_rwlock, RW_READER);
193	if (dn->dn_bonus == NULL) {
194		rw_exit(&dn->dn_struct_rwlock);
195		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
196		if (dn->dn_bonus == NULL)
197			dbuf_create_bonus(dn);
198	}
199	db = dn->dn_bonus;
200	rw_exit(&dn->dn_struct_rwlock);
201
202	/* as long as the bonus buf is held, the dnode will be held */
203	if (refcount_add(&db->db_holds, tag) == 1)
204		VERIFY(dnode_add_ref(dn, db));
205
206	dnode_rele(dn, FTAG);
207
208	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
209
210	*dbp = &db->db;
211	return (0);
212}
213
214/*
215 * returns ENOENT, EIO, or 0.
216 *
217 * This interface will allocate a blank spill dbuf when a spill blk
218 * doesn't already exist on the dnode.
219 *
220 * if you only want to find an already existing spill db, then
221 * dmu_spill_hold_existing() should be used.
222 */
223int
224dmu_spill_hold_by_dnode(dnode_t *dn, uint32_t flags, void *tag, dmu_buf_t **dbp)
225{
226	dmu_buf_impl_t *db = NULL;
227	int err;
228
229	if ((flags & DB_RF_HAVESTRUCT) == 0)
230		rw_enter(&dn->dn_struct_rwlock, RW_READER);
231
232	db = dbuf_hold(dn, DMU_SPILL_BLKID, tag);
233
234	if ((flags & DB_RF_HAVESTRUCT) == 0)
235		rw_exit(&dn->dn_struct_rwlock);
236
237	ASSERT(db != NULL);
238	err = dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | flags);
239	*dbp = &db->db;
240	return (err);
241}
242
243int
244dmu_spill_hold_existing(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
245{
246	dnode_t *dn = ((dmu_buf_impl_t *)bonus)->db_dnode;
247	int err;
248
249	if (spa_version(dn->dn_objset->os_spa) < SPA_VERSION_SA)
250		return (EINVAL);
251	rw_enter(&dn->dn_struct_rwlock, RW_READER);
252
253	if (!dn->dn_have_spill) {
254		rw_exit(&dn->dn_struct_rwlock);
255		return (ENOENT);
256	}
257	err = dmu_spill_hold_by_dnode(dn, DB_RF_HAVESTRUCT, tag, dbp);
258	rw_exit(&dn->dn_struct_rwlock);
259	return (err);
260}
261
262int
263dmu_spill_hold_by_bonus(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
264{
265	return (dmu_spill_hold_by_dnode(((dmu_buf_impl_t *)bonus)->db_dnode,
266	    0, tag, dbp));
267}
268
269/*
270 * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
271 * to take a held dnode rather than <os, object> -- the lookup is wasteful,
272 * and can induce severe lock contention when writing to several files
273 * whose dnodes are in the same block.
274 */
275static int
276dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
277    int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
278{
279	dsl_pool_t *dp = NULL;
280	dmu_buf_t **dbp;
281	uint64_t blkid, nblks, i;
282	uint32_t dbuf_flags;
283	int err;
284	zio_t *zio;
285	hrtime_t start;
286
287	ASSERT(length <= DMU_MAX_ACCESS);
288
289	dbuf_flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT | DB_RF_HAVESTRUCT;
290	if (flags & DMU_READ_NO_PREFETCH || length > zfetch_array_rd_sz)
291		dbuf_flags |= DB_RF_NOPREFETCH;
292
293	rw_enter(&dn->dn_struct_rwlock, RW_READER);
294	if (dn->dn_datablkshift) {
295		int blkshift = dn->dn_datablkshift;
296		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
297		    P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
298	} else {
299		if (offset + length > dn->dn_datablksz) {
300			zfs_panic_recover("zfs: accessing past end of object "
301			    "%llx/%llx (size=%u access=%llu+%llu)",
302			    (longlong_t)dn->dn_objset->
303			    os_dsl_dataset->ds_object,
304			    (longlong_t)dn->dn_object, dn->dn_datablksz,
305			    (longlong_t)offset, (longlong_t)length);
306			rw_exit(&dn->dn_struct_rwlock);
307			return (EIO);
308		}
309		nblks = 1;
310	}
311	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
312
313	if (dn->dn_objset->os_dsl_dataset)
314		dp = dn->dn_objset->os_dsl_dataset->ds_dir->dd_pool;
315	if (dp && dsl_pool_sync_context(dp))
316		start = gethrtime();
317	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
318	blkid = dbuf_whichblock(dn, offset);
319	for (i = 0; i < nblks; i++) {
320		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
321		if (db == NULL) {
322			rw_exit(&dn->dn_struct_rwlock);
323			dmu_buf_rele_array(dbp, nblks, tag);
324			zio_nowait(zio);
325			return (EIO);
326		}
327		/* initiate async i/o */
328		if (read) {
329			(void) dbuf_read(db, zio, dbuf_flags);
330		}
331		dbp[i] = &db->db;
332	}
333	rw_exit(&dn->dn_struct_rwlock);
334
335	/* wait for async i/o */
336	err = zio_wait(zio);
337	/* track read overhead when we are in sync context */
338	if (dp && dsl_pool_sync_context(dp))
339		dp->dp_read_overhead += gethrtime() - start;
340	if (err) {
341		dmu_buf_rele_array(dbp, nblks, tag);
342		return (err);
343	}
344
345	/* wait for other io to complete */
346	if (read) {
347		for (i = 0; i < nblks; i++) {
348			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
349			mutex_enter(&db->db_mtx);
350			while (db->db_state == DB_READ ||
351			    db->db_state == DB_FILL)
352				cv_wait(&db->db_changed, &db->db_mtx);
353			if (db->db_state == DB_UNCACHED)
354				err = EIO;
355			mutex_exit(&db->db_mtx);
356			if (err) {
357				dmu_buf_rele_array(dbp, nblks, tag);
358				return (err);
359			}
360		}
361	}
362
363	*numbufsp = nblks;
364	*dbpp = dbp;
365	return (0);
366}
367
368static int
369dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
370    uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
371{
372	dnode_t *dn;
373	int err;
374
375	err = dnode_hold(os, object, FTAG, &dn);
376	if (err)
377		return (err);
378
379	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
380	    numbufsp, dbpp, DMU_READ_PREFETCH);
381
382	dnode_rele(dn, FTAG);
383
384	return (err);
385}
386
387int
388dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
389    uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
390{
391	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
392	int err;
393
394	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
395	    numbufsp, dbpp, DMU_READ_PREFETCH);
396
397	return (err);
398}
399
400void
401dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
402{
403	int i;
404	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
405
406	if (numbufs == 0)
407		return;
408
409	for (i = 0; i < numbufs; i++) {
410		if (dbp[i])
411			dbuf_rele(dbp[i], tag);
412	}
413
414	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
415}
416
417void
418dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
419{
420	dnode_t *dn;
421	uint64_t blkid;
422	int nblks, i, err;
423
424	if (zfs_prefetch_disable)
425		return;
426
427	if (len == 0) {  /* they're interested in the bonus buffer */
428		dn = os->os_meta_dnode;
429
430		if (object == 0 || object >= DN_MAX_OBJECT)
431			return;
432
433		rw_enter(&dn->dn_struct_rwlock, RW_READER);
434		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
435		dbuf_prefetch(dn, blkid);
436		rw_exit(&dn->dn_struct_rwlock);
437		return;
438	}
439
440	/*
441	 * XXX - Note, if the dnode for the requested object is not
442	 * already cached, we will do a *synchronous* read in the
443	 * dnode_hold() call.  The same is true for any indirects.
444	 */
445	err = dnode_hold(os, object, FTAG, &dn);
446	if (err != 0)
447		return;
448
449	rw_enter(&dn->dn_struct_rwlock, RW_READER);
450	if (dn->dn_datablkshift) {
451		int blkshift = dn->dn_datablkshift;
452		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
453		    P2ALIGN(offset, 1<<blkshift)) >> blkshift;
454	} else {
455		nblks = (offset < dn->dn_datablksz);
456	}
457
458	if (nblks != 0) {
459		blkid = dbuf_whichblock(dn, offset);
460		for (i = 0; i < nblks; i++)
461			dbuf_prefetch(dn, blkid+i);
462	}
463
464	rw_exit(&dn->dn_struct_rwlock);
465
466	dnode_rele(dn, FTAG);
467}
468
469/*
470 * Get the next "chunk" of file data to free.  We traverse the file from
471 * the end so that the file gets shorter over time (if we crashes in the
472 * middle, this will leave us in a better state).  We find allocated file
473 * data by simply searching the allocated level 1 indirects.
474 */
475static int
476get_next_chunk(dnode_t *dn, uint64_t *start, uint64_t limit)
477{
478	uint64_t len = *start - limit;
479	uint64_t blkcnt = 0;
480	uint64_t maxblks = DMU_MAX_ACCESS / (1ULL << (dn->dn_indblkshift + 1));
481	uint64_t iblkrange =
482	    dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
483
484	ASSERT(limit <= *start);
485
486	if (len <= iblkrange * maxblks) {
487		*start = limit;
488		return (0);
489	}
490	ASSERT(ISP2(iblkrange));
491
492	while (*start > limit && blkcnt < maxblks) {
493		int err;
494
495		/* find next allocated L1 indirect */
496		err = dnode_next_offset(dn,
497		    DNODE_FIND_BACKWARDS, start, 2, 1, 0);
498
499		/* if there are no more, then we are done */
500		if (err == ESRCH) {
501			*start = limit;
502			return (0);
503		} else if (err) {
504			return (err);
505		}
506		blkcnt += 1;
507
508		/* reset offset to end of "next" block back */
509		*start = P2ALIGN(*start, iblkrange);
510		if (*start <= limit)
511			*start = limit;
512		else
513			*start -= 1;
514	}
515	return (0);
516}
517
518static int
519dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
520    uint64_t length, boolean_t free_dnode)
521{
522	dmu_tx_t *tx;
523	uint64_t object_size, start, end, len;
524	boolean_t trunc = (length == DMU_OBJECT_END);
525	int align, err;
526
527	align = 1 << dn->dn_datablkshift;
528	ASSERT(align > 0);
529	object_size = align == 1 ? dn->dn_datablksz :
530	    (dn->dn_maxblkid + 1) << dn->dn_datablkshift;
531
532	end = offset + length;
533	if (trunc || end > object_size)
534		end = object_size;
535	if (end <= offset)
536		return (0);
537	length = end - offset;
538
539	while (length) {
540		start = end;
541		/* assert(offset <= start) */
542		err = get_next_chunk(dn, &start, offset);
543		if (err)
544			return (err);
545		len = trunc ? DMU_OBJECT_END : end - start;
546
547		tx = dmu_tx_create(os);
548		dmu_tx_hold_free(tx, dn->dn_object, start, len);
549		err = dmu_tx_assign(tx, TXG_WAIT);
550		if (err) {
551			dmu_tx_abort(tx);
552			return (err);
553		}
554
555		dnode_free_range(dn, start, trunc ? -1 : len, tx);
556
557		if (start == 0 && free_dnode) {
558			ASSERT(trunc);
559			dnode_free(dn, tx);
560		}
561
562		length -= end - start;
563
564		dmu_tx_commit(tx);
565		end = start;
566	}
567	return (0);
568}
569
570int
571dmu_free_long_range(objset_t *os, uint64_t object,
572    uint64_t offset, uint64_t length)
573{
574	dnode_t *dn;
575	int err;
576
577	err = dnode_hold(os, object, FTAG, &dn);
578	if (err != 0)
579		return (err);
580	err = dmu_free_long_range_impl(os, dn, offset, length, FALSE);
581	dnode_rele(dn, FTAG);
582	return (err);
583}
584
585int
586dmu_free_object(objset_t *os, uint64_t object)
587{
588	dnode_t *dn;
589	dmu_tx_t *tx;
590	int err;
591
592	err = dnode_hold_impl(os, object, DNODE_MUST_BE_ALLOCATED,
593	    FTAG, &dn);
594	if (err != 0)
595		return (err);
596	if (dn->dn_nlevels == 1) {
597		tx = dmu_tx_create(os);
598		dmu_tx_hold_bonus(tx, object);
599		dmu_tx_hold_free(tx, dn->dn_object, 0, DMU_OBJECT_END);
600		err = dmu_tx_assign(tx, TXG_WAIT);
601		if (err == 0) {
602			dnode_free_range(dn, 0, DMU_OBJECT_END, tx);
603			dnode_free(dn, tx);
604			dmu_tx_commit(tx);
605		} else {
606			dmu_tx_abort(tx);
607		}
608	} else {
609		err = dmu_free_long_range_impl(os, dn, 0, DMU_OBJECT_END, TRUE);
610	}
611	dnode_rele(dn, FTAG);
612	return (err);
613}
614
615int
616dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
617    uint64_t size, dmu_tx_t *tx)
618{
619	dnode_t *dn;
620	int err = dnode_hold(os, object, FTAG, &dn);
621	if (err)
622		return (err);
623	ASSERT(offset < UINT64_MAX);
624	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
625	dnode_free_range(dn, offset, size, tx);
626	dnode_rele(dn, FTAG);
627	return (0);
628}
629
630int
631dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
632    void *buf, uint32_t flags)
633{
634	dnode_t *dn;
635	dmu_buf_t **dbp;
636	int numbufs, err;
637
638	err = dnode_hold(os, object, FTAG, &dn);
639	if (err)
640		return (err);
641
642	/*
643	 * Deal with odd block sizes, where there can't be data past the first
644	 * block.  If we ever do the tail block optimization, we will need to
645	 * handle that here as well.
646	 */
647	if (dn->dn_maxblkid == 0) {
648		int newsz = offset > dn->dn_datablksz ? 0 :
649		    MIN(size, dn->dn_datablksz - offset);
650		bzero((char *)buf + newsz, size - newsz);
651		size = newsz;
652	}
653
654	while (size > 0) {
655		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
656		int i;
657
658		/*
659		 * NB: we could do this block-at-a-time, but it's nice
660		 * to be reading in parallel.
661		 */
662		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
663		    TRUE, FTAG, &numbufs, &dbp, flags);
664		if (err)
665			break;
666
667		for (i = 0; i < numbufs; i++) {
668			int tocpy;
669			int bufoff;
670			dmu_buf_t *db = dbp[i];
671
672			ASSERT(size > 0);
673
674			bufoff = offset - db->db_offset;
675			tocpy = (int)MIN(db->db_size - bufoff, size);
676
677			bcopy((char *)db->db_data + bufoff, buf, tocpy);
678
679			offset += tocpy;
680			size -= tocpy;
681			buf = (char *)buf + tocpy;
682		}
683		dmu_buf_rele_array(dbp, numbufs, FTAG);
684	}
685	dnode_rele(dn, FTAG);
686	return (err);
687}
688
689void
690dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
691    const void *buf, dmu_tx_t *tx)
692{
693	dmu_buf_t **dbp;
694	int numbufs, i;
695
696	if (size == 0)
697		return;
698
699	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
700	    FALSE, FTAG, &numbufs, &dbp));
701
702	for (i = 0; i < numbufs; i++) {
703		int tocpy;
704		int bufoff;
705		dmu_buf_t *db = dbp[i];
706
707		ASSERT(size > 0);
708
709		bufoff = offset - db->db_offset;
710		tocpy = (int)MIN(db->db_size - bufoff, size);
711
712		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
713
714		if (tocpy == db->db_size)
715			dmu_buf_will_fill(db, tx);
716		else
717			dmu_buf_will_dirty(db, tx);
718
719		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
720
721		if (tocpy == db->db_size)
722			dmu_buf_fill_done(db, tx);
723
724		offset += tocpy;
725		size -= tocpy;
726		buf = (char *)buf + tocpy;
727	}
728	dmu_buf_rele_array(dbp, numbufs, FTAG);
729}
730
731void
732dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
733    dmu_tx_t *tx)
734{
735	dmu_buf_t **dbp;
736	int numbufs, i;
737
738	if (size == 0)
739		return;
740
741	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
742	    FALSE, FTAG, &numbufs, &dbp));
743
744	for (i = 0; i < numbufs; i++) {
745		dmu_buf_t *db = dbp[i];
746
747		dmu_buf_will_not_fill(db, tx);
748	}
749	dmu_buf_rele_array(dbp, numbufs, FTAG);
750}
751
752/*
753 * DMU support for xuio
754 */
755kstat_t *xuio_ksp = NULL;
756
757int
758dmu_xuio_init(xuio_t *xuio, int nblk)
759{
760	dmu_xuio_t *priv;
761	uio_t *uio = &xuio->xu_uio;
762
763	uio->uio_iovcnt = nblk;
764	uio->uio_iov = kmem_zalloc(nblk * sizeof (iovec_t), KM_SLEEP);
765
766	priv = kmem_zalloc(sizeof (dmu_xuio_t), KM_SLEEP);
767	priv->cnt = nblk;
768	priv->bufs = kmem_zalloc(nblk * sizeof (arc_buf_t *), KM_SLEEP);
769	priv->iovp = uio->uio_iov;
770	XUIO_XUZC_PRIV(xuio) = priv;
771
772	if (XUIO_XUZC_RW(xuio) == UIO_READ)
773		XUIOSTAT_INCR(xuiostat_onloan_rbuf, nblk);
774	else
775		XUIOSTAT_INCR(xuiostat_onloan_wbuf, nblk);
776
777	return (0);
778}
779
780void
781dmu_xuio_fini(xuio_t *xuio)
782{
783	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
784	int nblk = priv->cnt;
785
786	kmem_free(priv->iovp, nblk * sizeof (iovec_t));
787	kmem_free(priv->bufs, nblk * sizeof (arc_buf_t *));
788	kmem_free(priv, sizeof (dmu_xuio_t));
789
790	if (XUIO_XUZC_RW(xuio) == UIO_READ)
791		XUIOSTAT_INCR(xuiostat_onloan_rbuf, -nblk);
792	else
793		XUIOSTAT_INCR(xuiostat_onloan_wbuf, -nblk);
794}
795
796/*
797 * Initialize iov[priv->next] and priv->bufs[priv->next] with { off, n, abuf }
798 * and increase priv->next by 1.
799 */
800int
801dmu_xuio_add(xuio_t *xuio, arc_buf_t *abuf, offset_t off, size_t n)
802{
803	struct iovec *iov;
804	uio_t *uio = &xuio->xu_uio;
805	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
806	int i = priv->next++;
807
808	ASSERT(i < priv->cnt);
809	ASSERT(off + n <= arc_buf_size(abuf));
810	iov = uio->uio_iov + i;
811	iov->iov_base = (char *)abuf->b_data + off;
812	iov->iov_len = n;
813	priv->bufs[i] = abuf;
814	return (0);
815}
816
817int
818dmu_xuio_cnt(xuio_t *xuio)
819{
820	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
821	return (priv->cnt);
822}
823
824arc_buf_t *
825dmu_xuio_arcbuf(xuio_t *xuio, int i)
826{
827	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
828
829	ASSERT(i < priv->cnt);
830	return (priv->bufs[i]);
831}
832
833void
834dmu_xuio_clear(xuio_t *xuio, int i)
835{
836	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
837
838	ASSERT(i < priv->cnt);
839	priv->bufs[i] = NULL;
840}
841
842static void
843xuio_stat_init(void)
844{
845	xuio_ksp = kstat_create("zfs", 0, "xuio_stats", "misc",
846	    KSTAT_TYPE_NAMED, sizeof (xuio_stats) / sizeof (kstat_named_t),
847	    KSTAT_FLAG_VIRTUAL);
848	if (xuio_ksp != NULL) {
849		xuio_ksp->ks_data = &xuio_stats;
850		kstat_install(xuio_ksp);
851	}
852}
853
854static void
855xuio_stat_fini(void)
856{
857	if (xuio_ksp != NULL) {
858		kstat_delete(xuio_ksp);
859		xuio_ksp = NULL;
860	}
861}
862
863void
864xuio_stat_wbuf_copied()
865{
866	XUIOSTAT_BUMP(xuiostat_wbuf_copied);
867}
868
869void
870xuio_stat_wbuf_nocopy()
871{
872	XUIOSTAT_BUMP(xuiostat_wbuf_nocopy);
873}
874
875#ifdef _KERNEL
876int
877dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
878{
879	dmu_buf_t **dbp;
880	int numbufs, i, err;
881	xuio_t *xuio = NULL;
882
883	/*
884	 * NB: we could do this block-at-a-time, but it's nice
885	 * to be reading in parallel.
886	 */
887	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
888	    &numbufs, &dbp);
889	if (err)
890		return (err);
891
892	if (uio->uio_extflg == UIO_XUIO)
893		xuio = (xuio_t *)uio;
894
895	for (i = 0; i < numbufs; i++) {
896		int tocpy;
897		int bufoff;
898		dmu_buf_t *db = dbp[i];
899
900		ASSERT(size > 0);
901
902		bufoff = uio->uio_loffset - db->db_offset;
903		tocpy = (int)MIN(db->db_size - bufoff, size);
904
905		if (xuio) {
906			dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
907			arc_buf_t *dbuf_abuf = dbi->db_buf;
908			arc_buf_t *abuf = dbuf_loan_arcbuf(dbi);
909			err = dmu_xuio_add(xuio, abuf, bufoff, tocpy);
910			if (!err) {
911				uio->uio_resid -= tocpy;
912				uio->uio_loffset += tocpy;
913			}
914
915			if (abuf == dbuf_abuf)
916				XUIOSTAT_BUMP(xuiostat_rbuf_nocopy);
917			else
918				XUIOSTAT_BUMP(xuiostat_rbuf_copied);
919		} else {
920			err = uiomove((char *)db->db_data + bufoff, tocpy,
921			    UIO_READ, uio);
922		}
923		if (err)
924			break;
925
926		size -= tocpy;
927	}
928	dmu_buf_rele_array(dbp, numbufs, FTAG);
929
930	return (err);
931}
932
933static int
934dmu_write_uio_dnode(dnode_t *dn, uio_t *uio, uint64_t size, dmu_tx_t *tx)
935{
936	dmu_buf_t **dbp;
937	int numbufs;
938	int err = 0;
939	int i;
940
941	err = dmu_buf_hold_array_by_dnode(dn, uio->uio_loffset, size,
942	    FALSE, FTAG, &numbufs, &dbp, DMU_READ_PREFETCH);
943	if (err)
944		return (err);
945
946	for (i = 0; i < numbufs; i++) {
947		int tocpy;
948		int bufoff;
949		dmu_buf_t *db = dbp[i];
950
951		ASSERT(size > 0);
952
953		bufoff = uio->uio_loffset - db->db_offset;
954		tocpy = (int)MIN(db->db_size - bufoff, size);
955
956		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
957
958		if (tocpy == db->db_size)
959			dmu_buf_will_fill(db, tx);
960		else
961			dmu_buf_will_dirty(db, tx);
962
963		/*
964		 * XXX uiomove could block forever (eg. nfs-backed
965		 * pages).  There needs to be a uiolockdown() function
966		 * to lock the pages in memory, so that uiomove won't
967		 * block.
968		 */
969		err = uiomove((char *)db->db_data + bufoff, tocpy,
970		    UIO_WRITE, uio);
971
972		if (tocpy == db->db_size)
973			dmu_buf_fill_done(db, tx);
974
975		if (err)
976			break;
977
978		size -= tocpy;
979	}
980
981	dmu_buf_rele_array(dbp, numbufs, FTAG);
982	return (err);
983}
984
985int
986dmu_write_uio_dbuf(dmu_buf_t *zdb, uio_t *uio, uint64_t size,
987    dmu_tx_t *tx)
988{
989	if (size == 0)
990		return (0);
991
992	return (dmu_write_uio_dnode(((dmu_buf_impl_t *)zdb)->db_dnode,
993	    uio, size, tx));
994}
995
996int
997dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
998    dmu_tx_t *tx)
999{
1000	dnode_t *dn;
1001	int err;
1002
1003	if (size == 0)
1004		return (0);
1005
1006	err = dnode_hold(os, object, FTAG, &dn);
1007	if (err)
1008		return (err);
1009
1010	err = dmu_write_uio_dnode(dn, uio, size, tx);
1011
1012	dnode_rele(dn, FTAG);
1013
1014	return (err);
1015}
1016
1017int
1018dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
1019    page_t *pp, dmu_tx_t *tx)
1020{
1021	dmu_buf_t **dbp;
1022	int numbufs, i;
1023	int err;
1024
1025	if (size == 0)
1026		return (0);
1027
1028	err = dmu_buf_hold_array(os, object, offset, size,
1029	    FALSE, FTAG, &numbufs, &dbp);
1030	if (err)
1031		return (err);
1032
1033	for (i = 0; i < numbufs; i++) {
1034		int tocpy, copied, thiscpy;
1035		int bufoff;
1036		dmu_buf_t *db = dbp[i];
1037		caddr_t va;
1038
1039		ASSERT(size > 0);
1040		ASSERT3U(db->db_size, >=, PAGESIZE);
1041
1042		bufoff = offset - db->db_offset;
1043		tocpy = (int)MIN(db->db_size - bufoff, size);
1044
1045		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1046
1047		if (tocpy == db->db_size)
1048			dmu_buf_will_fill(db, tx);
1049		else
1050			dmu_buf_will_dirty(db, tx);
1051
1052		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1053			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
1054			thiscpy = MIN(PAGESIZE, tocpy - copied);
1055			va = zfs_map_page(pp, S_READ);
1056			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1057			zfs_unmap_page(pp, va);
1058			pp = pp->p_next;
1059			bufoff += PAGESIZE;
1060		}
1061
1062		if (tocpy == db->db_size)
1063			dmu_buf_fill_done(db, tx);
1064
1065		offset += tocpy;
1066		size -= tocpy;
1067	}
1068	dmu_buf_rele_array(dbp, numbufs, FTAG);
1069	return (err);
1070}
1071#endif
1072
1073/*
1074 * Allocate a loaned anonymous arc buffer.
1075 */
1076arc_buf_t *
1077dmu_request_arcbuf(dmu_buf_t *handle, int size)
1078{
1079	dnode_t *dn = ((dmu_buf_impl_t *)handle)->db_dnode;
1080
1081	return (arc_loan_buf(dn->dn_objset->os_spa, size));
1082}
1083
1084/*
1085 * Free a loaned arc buffer.
1086 */
1087void
1088dmu_return_arcbuf(arc_buf_t *buf)
1089{
1090	arc_return_buf(buf, FTAG);
1091	VERIFY(arc_buf_remove_ref(buf, FTAG) == 1);
1092}
1093
1094/*
1095 * When possible directly assign passed loaned arc buffer to a dbuf.
1096 * If this is not possible copy the contents of passed arc buf via
1097 * dmu_write().
1098 */
1099void
1100dmu_assign_arcbuf(dmu_buf_t *handle, uint64_t offset, arc_buf_t *buf,
1101    dmu_tx_t *tx)
1102{
1103	dnode_t *dn = ((dmu_buf_impl_t *)handle)->db_dnode;
1104	dmu_buf_impl_t *db;
1105	uint32_t blksz = (uint32_t)arc_buf_size(buf);
1106	uint64_t blkid;
1107
1108	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1109	blkid = dbuf_whichblock(dn, offset);
1110	VERIFY((db = dbuf_hold(dn, blkid, FTAG)) != NULL);
1111	rw_exit(&dn->dn_struct_rwlock);
1112
1113	if (offset == db->db.db_offset && blksz == db->db.db_size) {
1114		dbuf_assign_arcbuf(db, buf, tx);
1115		dbuf_rele(db, FTAG);
1116	} else {
1117		dbuf_rele(db, FTAG);
1118		dmu_write(dn->dn_objset, dn->dn_object, offset, blksz,
1119		    buf->b_data, tx);
1120		dmu_return_arcbuf(buf);
1121		XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1122	}
1123}
1124
1125typedef struct {
1126	dbuf_dirty_record_t	*dsa_dr;
1127	dmu_sync_cb_t		*dsa_done;
1128	zgd_t			*dsa_zgd;
1129	dmu_tx_t		*dsa_tx;
1130} dmu_sync_arg_t;
1131
1132/* ARGSUSED */
1133static void
1134dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
1135{
1136	dmu_sync_arg_t *dsa = varg;
1137	dmu_buf_t *db = dsa->dsa_zgd->zgd_db;
1138	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1139	blkptr_t *bp = zio->io_bp;
1140
1141	if (zio->io_error == 0) {
1142		if (BP_IS_HOLE(bp)) {
1143			/*
1144			 * A block of zeros may compress to a hole, but the
1145			 * block size still needs to be known for replay.
1146			 */
1147			BP_SET_LSIZE(bp, db->db_size);
1148		} else {
1149			ASSERT(BP_GET_TYPE(bp) == dn->dn_type);
1150			ASSERT(BP_GET_LEVEL(bp) == 0);
1151			bp->blk_fill = 1;
1152		}
1153	}
1154}
1155
1156static void
1157dmu_sync_late_arrival_ready(zio_t *zio)
1158{
1159	dmu_sync_ready(zio, NULL, zio->io_private);
1160}
1161
1162/* ARGSUSED */
1163static void
1164dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
1165{
1166	dmu_sync_arg_t *dsa = varg;
1167	dbuf_dirty_record_t *dr = dsa->dsa_dr;
1168	dmu_buf_impl_t *db = dr->dr_dbuf;
1169
1170	mutex_enter(&db->db_mtx);
1171	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
1172	if (zio->io_error == 0) {
1173		dr->dt.dl.dr_overridden_by = *zio->io_bp;
1174		dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
1175		dr->dt.dl.dr_copies = zio->io_prop.zp_copies;
1176		if (BP_IS_HOLE(&dr->dt.dl.dr_overridden_by))
1177			BP_ZERO(&dr->dt.dl.dr_overridden_by);
1178	} else {
1179		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
1180	}
1181	cv_broadcast(&db->db_changed);
1182	mutex_exit(&db->db_mtx);
1183
1184	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1185
1186	kmem_free(dsa, sizeof (*dsa));
1187}
1188
1189static void
1190dmu_sync_late_arrival_done(zio_t *zio)
1191{
1192	blkptr_t *bp = zio->io_bp;
1193	dmu_sync_arg_t *dsa = zio->io_private;
1194
1195	if (zio->io_error == 0 && !BP_IS_HOLE(bp)) {
1196		ASSERT(zio->io_bp->blk_birth == zio->io_txg);
1197		ASSERT(zio->io_txg > spa_syncing_txg(zio->io_spa));
1198		zio_free(zio->io_spa, zio->io_txg, zio->io_bp);
1199	}
1200
1201	dmu_tx_commit(dsa->dsa_tx);
1202
1203	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1204
1205	kmem_free(dsa, sizeof (*dsa));
1206}
1207
1208static int
1209dmu_sync_late_arrival(zio_t *pio, objset_t *os, dmu_sync_cb_t *done, zgd_t *zgd,
1210    zio_prop_t *zp, zbookmark_t *zb)
1211{
1212	dmu_sync_arg_t *dsa;
1213	dmu_tx_t *tx;
1214
1215	tx = dmu_tx_create(os);
1216	dmu_tx_hold_space(tx, zgd->zgd_db->db_size);
1217	if (dmu_tx_assign(tx, TXG_WAIT) != 0) {
1218		dmu_tx_abort(tx);
1219		return (EIO);	/* Make zl_get_data do txg_waited_synced() */
1220	}
1221
1222	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1223	dsa->dsa_dr = NULL;
1224	dsa->dsa_done = done;
1225	dsa->dsa_zgd = zgd;
1226	dsa->dsa_tx = tx;
1227
1228	zio_nowait(zio_write(pio, os->os_spa, dmu_tx_get_txg(tx), zgd->zgd_bp,
1229	    zgd->zgd_db->db_data, zgd->zgd_db->db_size, zp,
1230	    dmu_sync_late_arrival_ready, dmu_sync_late_arrival_done, dsa,
1231	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, zb));
1232
1233	return (0);
1234}
1235
1236/*
1237 * Intent log support: sync the block associated with db to disk.
1238 * N.B. and XXX: the caller is responsible for making sure that the
1239 * data isn't changing while dmu_sync() is writing it.
1240 *
1241 * Return values:
1242 *
1243 *	EEXIST: this txg has already been synced, so there's nothing to to.
1244 *		The caller should not log the write.
1245 *
1246 *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1247 *		The caller should not log the write.
1248 *
1249 *	EALREADY: this block is already in the process of being synced.
1250 *		The caller should track its progress (somehow).
1251 *
1252 *	EIO: could not do the I/O.
1253 *		The caller should do a txg_wait_synced().
1254 *
1255 *	0: the I/O has been initiated.
1256 *		The caller should log this blkptr in the done callback.
1257 *		It is possible that the I/O will fail, in which case
1258 *		the error will be reported to the done callback and
1259 *		propagated to pio from zio_done().
1260 */
1261int
1262dmu_sync(zio_t *pio, uint64_t txg, dmu_sync_cb_t *done, zgd_t *zgd)
1263{
1264	blkptr_t *bp = zgd->zgd_bp;
1265	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zgd->zgd_db;
1266	objset_t *os = db->db_objset;
1267	dsl_dataset_t *ds = os->os_dsl_dataset;
1268	dbuf_dirty_record_t *dr;
1269	dmu_sync_arg_t *dsa;
1270	zbookmark_t zb;
1271	zio_prop_t zp;
1272
1273	ASSERT(pio != NULL);
1274	ASSERT(BP_IS_HOLE(bp));
1275	ASSERT(txg != 0);
1276
1277	SET_BOOKMARK(&zb, ds->ds_object,
1278	    db->db.db_object, db->db_level, db->db_blkid);
1279
1280	dmu_write_policy(os, db->db_dnode, db->db_level, WP_DMU_SYNC, &zp);
1281
1282	/*
1283	 * If we're frozen (running ziltest), we always need to generate a bp.
1284	 */
1285	if (txg > spa_freeze_txg(os->os_spa))
1286		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
1287
1288	/*
1289	 * Grabbing db_mtx now provides a barrier between dbuf_sync_leaf()
1290	 * and us.  If we determine that this txg is not yet syncing,
1291	 * but it begins to sync a moment later, that's OK because the
1292	 * sync thread will block in dbuf_sync_leaf() until we drop db_mtx.
1293	 */
1294	mutex_enter(&db->db_mtx);
1295
1296	if (txg <= spa_last_synced_txg(os->os_spa)) {
1297		/*
1298		 * This txg has already synced.  There's nothing to do.
1299		 */
1300		mutex_exit(&db->db_mtx);
1301		return (EEXIST);
1302	}
1303
1304	if (txg <= spa_syncing_txg(os->os_spa)) {
1305		/*
1306		 * This txg is currently syncing, so we can't mess with
1307		 * the dirty record anymore; just write a new log block.
1308		 */
1309		mutex_exit(&db->db_mtx);
1310		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
1311	}
1312
1313	dr = db->db_last_dirty;
1314	while (dr && dr->dr_txg != txg)
1315		dr = dr->dr_next;
1316
1317	if (dr == NULL) {
1318		/*
1319		 * There's no dr for this dbuf, so it must have been freed.
1320		 * There's no need to log writes to freed blocks, so we're done.
1321		 */
1322		mutex_exit(&db->db_mtx);
1323		return (ENOENT);
1324	}
1325
1326	ASSERT(dr->dr_txg == txg);
1327	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC ||
1328	    dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
1329		/*
1330		 * We have already issued a sync write for this buffer,
1331		 * or this buffer has already been synced.  It could not
1332		 * have been dirtied since, or we would have cleared the state.
1333		 */
1334		mutex_exit(&db->db_mtx);
1335		return (EALREADY);
1336	}
1337
1338	ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
1339	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
1340	mutex_exit(&db->db_mtx);
1341
1342	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1343	dsa->dsa_dr = dr;
1344	dsa->dsa_done = done;
1345	dsa->dsa_zgd = zgd;
1346	dsa->dsa_tx = NULL;
1347
1348	zio_nowait(arc_write(pio, os->os_spa, txg,
1349	    bp, dr->dt.dl.dr_data, DBUF_IS_L2CACHEABLE(db), &zp,
1350	    dmu_sync_ready, dmu_sync_done, dsa,
1351	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, &zb));
1352
1353	return (0);
1354}
1355
1356int
1357dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1358	dmu_tx_t *tx)
1359{
1360	dnode_t *dn;
1361	int err;
1362
1363	err = dnode_hold(os, object, FTAG, &dn);
1364	if (err)
1365		return (err);
1366	err = dnode_set_blksz(dn, size, ibs, tx);
1367	dnode_rele(dn, FTAG);
1368	return (err);
1369}
1370
1371void
1372dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1373	dmu_tx_t *tx)
1374{
1375	dnode_t *dn;
1376
1377	/* XXX assumes dnode_hold will not get an i/o error */
1378	(void) dnode_hold(os, object, FTAG, &dn);
1379	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1380	dn->dn_checksum = checksum;
1381	dnode_setdirty(dn, tx);
1382	dnode_rele(dn, FTAG);
1383}
1384
1385void
1386dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1387	dmu_tx_t *tx)
1388{
1389	dnode_t *dn;
1390
1391	/* XXX assumes dnode_hold will not get an i/o error */
1392	(void) dnode_hold(os, object, FTAG, &dn);
1393	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1394	dn->dn_compress = compress;
1395	dnode_setdirty(dn, tx);
1396	dnode_rele(dn, FTAG);
1397}
1398
1399int zfs_mdcomp_disable = 0;
1400
1401void
1402dmu_write_policy(objset_t *os, dnode_t *dn, int level, int wp, zio_prop_t *zp)
1403{
1404	dmu_object_type_t type = dn ? dn->dn_type : DMU_OT_OBJSET;
1405	boolean_t ismd = (level > 0 || dmu_ot[type].ot_metadata);
1406	enum zio_checksum checksum = os->os_checksum;
1407	enum zio_compress compress = os->os_compress;
1408	enum zio_checksum dedup_checksum = os->os_dedup_checksum;
1409	boolean_t dedup;
1410	boolean_t dedup_verify = os->os_dedup_verify;
1411	int copies = os->os_copies;
1412
1413	/*
1414	 * Determine checksum setting.
1415	 */
1416	if (ismd) {
1417		/*
1418		 * Metadata always gets checksummed.  If the data
1419		 * checksum is multi-bit correctable, and it's not a
1420		 * ZBT-style checksum, then it's suitable for metadata
1421		 * as well.  Otherwise, the metadata checksum defaults
1422		 * to fletcher4.
1423		 */
1424		if (zio_checksum_table[checksum].ci_correctable < 1 ||
1425		    zio_checksum_table[checksum].ci_eck)
1426			checksum = ZIO_CHECKSUM_FLETCHER_4;
1427	} else {
1428		checksum = zio_checksum_select(dn->dn_checksum, checksum);
1429	}
1430
1431	/*
1432	 * Determine compression setting.
1433	 */
1434	if (ismd) {
1435		/*
1436		 * XXX -- we should design a compression algorithm
1437		 * that specializes in arrays of bps.
1438		 */
1439		compress = zfs_mdcomp_disable ? ZIO_COMPRESS_EMPTY :
1440		    ZIO_COMPRESS_LZJB;
1441	} else {
1442		compress = zio_compress_select(dn->dn_compress, compress);
1443	}
1444
1445	/*
1446	 * Determine dedup setting.  If we are in dmu_sync(), we won't
1447	 * actually dedup now because that's all done in syncing context;
1448	 * but we do want to use the dedup checkum.  If the checksum is not
1449	 * strong enough to ensure unique signatures, force dedup_verify.
1450	 */
1451	dedup = (!ismd && dedup_checksum != ZIO_CHECKSUM_OFF);
1452	if (dedup) {
1453		checksum = dedup_checksum;
1454		if (!zio_checksum_table[checksum].ci_dedup)
1455			dedup_verify = 1;
1456	}
1457
1458	if (wp & WP_DMU_SYNC)
1459		dedup = 0;
1460
1461	if (wp & WP_NOFILL) {
1462		ASSERT(!ismd && level == 0);
1463		checksum = ZIO_CHECKSUM_OFF;
1464		compress = ZIO_COMPRESS_OFF;
1465		dedup = B_FALSE;
1466	}
1467
1468	zp->zp_checksum = checksum;
1469	zp->zp_compress = compress;
1470	zp->zp_type = (wp & WP_SPILL) ? dn->dn_bonustype : type;
1471	zp->zp_level = level;
1472	zp->zp_copies = MIN(copies + ismd, spa_max_replication(os->os_spa));
1473	zp->zp_dedup = dedup;
1474	zp->zp_dedup_verify = dedup && dedup_verify;
1475}
1476
1477int
1478dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1479{
1480	dnode_t *dn;
1481	int i, err;
1482
1483	err = dnode_hold(os, object, FTAG, &dn);
1484	if (err)
1485		return (err);
1486	/*
1487	 * Sync any current changes before
1488	 * we go trundling through the block pointers.
1489	 */
1490	for (i = 0; i < TXG_SIZE; i++) {
1491		if (list_link_active(&dn->dn_dirty_link[i]))
1492			break;
1493	}
1494	if (i != TXG_SIZE) {
1495		dnode_rele(dn, FTAG);
1496		txg_wait_synced(dmu_objset_pool(os), 0);
1497		err = dnode_hold(os, object, FTAG, &dn);
1498		if (err)
1499			return (err);
1500	}
1501
1502	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
1503	dnode_rele(dn, FTAG);
1504
1505	return (err);
1506}
1507
1508void
1509dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1510{
1511	dnode_phys_t *dnp;
1512
1513	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1514	mutex_enter(&dn->dn_mtx);
1515
1516	dnp = dn->dn_phys;
1517
1518	doi->doi_data_block_size = dn->dn_datablksz;
1519	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1520	    1ULL << dn->dn_indblkshift : 0;
1521	doi->doi_type = dn->dn_type;
1522	doi->doi_bonus_type = dn->dn_bonustype;
1523	doi->doi_bonus_size = dn->dn_bonuslen;
1524	doi->doi_indirection = dn->dn_nlevels;
1525	doi->doi_checksum = dn->dn_checksum;
1526	doi->doi_compress = dn->dn_compress;
1527	doi->doi_physical_blocks_512 = (DN_USED_BYTES(dnp) + 256) >> 9;
1528	doi->doi_max_offset = (dnp->dn_maxblkid + 1) * dn->dn_datablksz;
1529	doi->doi_fill_count = 0;
1530	for (int i = 0; i < dnp->dn_nblkptr; i++)
1531		doi->doi_fill_count += dnp->dn_blkptr[i].blk_fill;
1532
1533	mutex_exit(&dn->dn_mtx);
1534	rw_exit(&dn->dn_struct_rwlock);
1535}
1536
1537/*
1538 * Get information on a DMU object.
1539 * If doi is NULL, just indicates whether the object exists.
1540 */
1541int
1542dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1543{
1544	dnode_t *dn;
1545	int err = dnode_hold(os, object, FTAG, &dn);
1546
1547	if (err)
1548		return (err);
1549
1550	if (doi != NULL)
1551		dmu_object_info_from_dnode(dn, doi);
1552
1553	dnode_rele(dn, FTAG);
1554	return (0);
1555}
1556
1557/*
1558 * As above, but faster; can be used when you have a held dbuf in hand.
1559 */
1560void
1561dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1562{
1563	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1564}
1565
1566/*
1567 * Faster still when you only care about the size.
1568 * This is specifically optimized for zfs_getattr().
1569 */
1570void
1571dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1572{
1573	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1574
1575	*blksize = dn->dn_datablksz;
1576	/* add 1 for dnode space */
1577	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
1578	    SPA_MINBLOCKSHIFT) + 1;
1579}
1580
1581void
1582byteswap_uint64_array(void *vbuf, size_t size)
1583{
1584	uint64_t *buf = vbuf;
1585	size_t count = size >> 3;
1586	int i;
1587
1588	ASSERT((size & 7) == 0);
1589
1590	for (i = 0; i < count; i++)
1591		buf[i] = BSWAP_64(buf[i]);
1592}
1593
1594void
1595byteswap_uint32_array(void *vbuf, size_t size)
1596{
1597	uint32_t *buf = vbuf;
1598	size_t count = size >> 2;
1599	int i;
1600
1601	ASSERT((size & 3) == 0);
1602
1603	for (i = 0; i < count; i++)
1604		buf[i] = BSWAP_32(buf[i]);
1605}
1606
1607void
1608byteswap_uint16_array(void *vbuf, size_t size)
1609{
1610	uint16_t *buf = vbuf;
1611	size_t count = size >> 1;
1612	int i;
1613
1614	ASSERT((size & 1) == 0);
1615
1616	for (i = 0; i < count; i++)
1617		buf[i] = BSWAP_16(buf[i]);
1618}
1619
1620/* ARGSUSED */
1621void
1622byteswap_uint8_array(void *vbuf, size_t size)
1623{
1624}
1625
1626void
1627dmu_init(void)
1628{
1629	dbuf_init();
1630	dnode_init();
1631	zfetch_init();
1632	arc_init();
1633	l2arc_init();
1634	xuio_stat_init();
1635	sa_cache_init();
1636}
1637
1638void
1639dmu_fini(void)
1640{
1641	arc_fini();
1642	zfetch_fini();
1643	dnode_fini();
1644	dbuf_fini();
1645	l2arc_fini();
1646	xuio_stat_fini();
1647	sa_cache_fini();
1648}
1649