dmu.c revision 975c32a05c38c6fa808592dd35fa6dba183ca077
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23 * Use is subject to license terms.
24 */
25
26#include <sys/dmu.h>
27#include <sys/dmu_impl.h>
28#include <sys/dmu_tx.h>
29#include <sys/dbuf.h>
30#include <sys/dnode.h>
31#include <sys/zfs_context.h>
32#include <sys/dmu_objset.h>
33#include <sys/dmu_traverse.h>
34#include <sys/dsl_dataset.h>
35#include <sys/dsl_dir.h>
36#include <sys/dsl_pool.h>
37#include <sys/dsl_synctask.h>
38#include <sys/dsl_prop.h>
39#include <sys/dmu_zfetch.h>
40#include <sys/zfs_ioctl.h>
41#include <sys/zap.h>
42#include <sys/zio_checksum.h>
43#ifdef _KERNEL
44#include <sys/vmsystm.h>
45#include <sys/zfs_znode.h>
46#endif
47
48const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
49	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
50	{	zap_byteswap,		TRUE,	"object directory"	},
51	{	byteswap_uint64_array,	TRUE,	"object array"		},
52	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
53	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
54	{	byteswap_uint64_array,	TRUE,	"bplist"		},
55	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
56	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
57	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
58	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
59	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
60	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
61	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
62	{	zap_byteswap,		TRUE,	"DSL directory child map"},
63	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
64	{	zap_byteswap,		TRUE,	"DSL props"		},
65	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
66	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
67	{	zfs_oldacl_byteswap,	TRUE,	"ZFS V0 ACL"		},
68	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
69	{	zap_byteswap,		TRUE,	"ZFS directory"		},
70	{	zap_byteswap,		TRUE,	"ZFS master node"	},
71	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
72	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
73	{	zap_byteswap,		TRUE,	"zvol prop"		},
74	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
75	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
76	{	zap_byteswap,		TRUE,	"other ZAP"		},
77	{	zap_byteswap,		TRUE,	"persistent error log"	},
78	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
79	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
80	{	zap_byteswap,		TRUE,	"Pool properties"	},
81	{	zap_byteswap,		TRUE,	"DSL permissions"	},
82	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
83	{	byteswap_uint8_array,	TRUE,	"ZFS SYSACL"		},
84	{	byteswap_uint8_array,	TRUE,	"FUID table"		},
85	{	byteswap_uint64_array,	TRUE,	"FUID table size"	},
86	{	zap_byteswap,		TRUE,	"DSL dataset next clones"},
87	{	zap_byteswap,		TRUE,	"scrub work queue"	},
88	{	zap_byteswap,		TRUE,	"ZFS user/group used"	},
89	{	zap_byteswap,		TRUE,	"ZFS user/group quota"	},
90	{	zap_byteswap,		TRUE,	"snapshot refcount tags"},
91};
92
93int
94dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
95    void *tag, dmu_buf_t **dbp)
96{
97	dnode_t *dn;
98	uint64_t blkid;
99	dmu_buf_impl_t *db;
100	int err;
101
102	err = dnode_hold(os, object, FTAG, &dn);
103	if (err)
104		return (err);
105	blkid = dbuf_whichblock(dn, offset);
106	rw_enter(&dn->dn_struct_rwlock, RW_READER);
107	db = dbuf_hold(dn, blkid, tag);
108	rw_exit(&dn->dn_struct_rwlock);
109	if (db == NULL) {
110		err = EIO;
111	} else {
112		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
113		if (err) {
114			dbuf_rele(db, tag);
115			db = NULL;
116		}
117	}
118
119	dnode_rele(dn, FTAG);
120	*dbp = &db->db;
121	return (err);
122}
123
124int
125dmu_bonus_max(void)
126{
127	return (DN_MAX_BONUSLEN);
128}
129
130int
131dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
132{
133	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
134
135	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
136		return (EINVAL);
137	if (newsize < 0 || newsize > db->db_size)
138		return (EINVAL);
139	dnode_setbonuslen(dn, newsize, tx);
140	return (0);
141}
142
143/*
144 * returns ENOENT, EIO, or 0.
145 */
146int
147dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
148{
149	dnode_t *dn;
150	dmu_buf_impl_t *db;
151	int error;
152
153	error = dnode_hold(os, object, FTAG, &dn);
154	if (error)
155		return (error);
156
157	rw_enter(&dn->dn_struct_rwlock, RW_READER);
158	if (dn->dn_bonus == NULL) {
159		rw_exit(&dn->dn_struct_rwlock);
160		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
161		if (dn->dn_bonus == NULL)
162			dbuf_create_bonus(dn);
163	}
164	db = dn->dn_bonus;
165	rw_exit(&dn->dn_struct_rwlock);
166
167	/* as long as the bonus buf is held, the dnode will be held */
168	if (refcount_add(&db->db_holds, tag) == 1)
169		VERIFY(dnode_add_ref(dn, db));
170
171	dnode_rele(dn, FTAG);
172
173	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
174
175	*dbp = &db->db;
176	return (0);
177}
178
179/*
180 * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
181 * to take a held dnode rather than <os, object> -- the lookup is wasteful,
182 * and can induce severe lock contention when writing to several files
183 * whose dnodes are in the same block.
184 */
185static int
186dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
187    int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
188{
189	dsl_pool_t *dp = NULL;
190	dmu_buf_t **dbp;
191	uint64_t blkid, nblks, i;
192	uint32_t dbuf_flags;
193	int err;
194	zio_t *zio;
195	hrtime_t start;
196
197	ASSERT(length <= DMU_MAX_ACCESS);
198
199	dbuf_flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT | DB_RF_HAVESTRUCT;
200	if (flags & DMU_READ_NO_PREFETCH || length > zfetch_array_rd_sz)
201		dbuf_flags |= DB_RF_NOPREFETCH;
202
203	rw_enter(&dn->dn_struct_rwlock, RW_READER);
204	if (dn->dn_datablkshift) {
205		int blkshift = dn->dn_datablkshift;
206		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
207		    P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
208	} else {
209		if (offset + length > dn->dn_datablksz) {
210			zfs_panic_recover("zfs: accessing past end of object "
211			    "%llx/%llx (size=%u access=%llu+%llu)",
212			    (longlong_t)dn->dn_objset->
213			    os_dsl_dataset->ds_object,
214			    (longlong_t)dn->dn_object, dn->dn_datablksz,
215			    (longlong_t)offset, (longlong_t)length);
216			rw_exit(&dn->dn_struct_rwlock);
217			return (EIO);
218		}
219		nblks = 1;
220	}
221	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
222
223	if (dn->dn_objset->os_dsl_dataset)
224		dp = dn->dn_objset->os_dsl_dataset->ds_dir->dd_pool;
225	if (dp && dsl_pool_sync_context(dp))
226		start = gethrtime();
227	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
228	blkid = dbuf_whichblock(dn, offset);
229	for (i = 0; i < nblks; i++) {
230		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
231		if (db == NULL) {
232			rw_exit(&dn->dn_struct_rwlock);
233			dmu_buf_rele_array(dbp, nblks, tag);
234			zio_nowait(zio);
235			return (EIO);
236		}
237		/* initiate async i/o */
238		if (read) {
239			(void) dbuf_read(db, zio, dbuf_flags);
240		}
241		dbp[i] = &db->db;
242	}
243	rw_exit(&dn->dn_struct_rwlock);
244
245	/* wait for async i/o */
246	err = zio_wait(zio);
247	/* track read overhead when we are in sync context */
248	if (dp && dsl_pool_sync_context(dp))
249		dp->dp_read_overhead += gethrtime() - start;
250	if (err) {
251		dmu_buf_rele_array(dbp, nblks, tag);
252		return (err);
253	}
254
255	/* wait for other io to complete */
256	if (read) {
257		for (i = 0; i < nblks; i++) {
258			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
259			mutex_enter(&db->db_mtx);
260			while (db->db_state == DB_READ ||
261			    db->db_state == DB_FILL)
262				cv_wait(&db->db_changed, &db->db_mtx);
263			if (db->db_state == DB_UNCACHED)
264				err = EIO;
265			mutex_exit(&db->db_mtx);
266			if (err) {
267				dmu_buf_rele_array(dbp, nblks, tag);
268				return (err);
269			}
270		}
271	}
272
273	*numbufsp = nblks;
274	*dbpp = dbp;
275	return (0);
276}
277
278static int
279dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
280    uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
281{
282	dnode_t *dn;
283	int err;
284
285	err = dnode_hold(os, object, FTAG, &dn);
286	if (err)
287		return (err);
288
289	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
290	    numbufsp, dbpp, DMU_READ_PREFETCH);
291
292	dnode_rele(dn, FTAG);
293
294	return (err);
295}
296
297int
298dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
299    uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
300{
301	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
302	int err;
303
304	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
305	    numbufsp, dbpp, DMU_READ_PREFETCH);
306
307	return (err);
308}
309
310void
311dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
312{
313	int i;
314	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
315
316	if (numbufs == 0)
317		return;
318
319	for (i = 0; i < numbufs; i++) {
320		if (dbp[i])
321			dbuf_rele(dbp[i], tag);
322	}
323
324	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
325}
326
327void
328dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
329{
330	dnode_t *dn;
331	uint64_t blkid;
332	int nblks, i, err;
333
334	if (zfs_prefetch_disable)
335		return;
336
337	if (len == 0) {  /* they're interested in the bonus buffer */
338		dn = os->os_meta_dnode;
339
340		if (object == 0 || object >= DN_MAX_OBJECT)
341			return;
342
343		rw_enter(&dn->dn_struct_rwlock, RW_READER);
344		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
345		dbuf_prefetch(dn, blkid);
346		rw_exit(&dn->dn_struct_rwlock);
347		return;
348	}
349
350	/*
351	 * XXX - Note, if the dnode for the requested object is not
352	 * already cached, we will do a *synchronous* read in the
353	 * dnode_hold() call.  The same is true for any indirects.
354	 */
355	err = dnode_hold(os, object, FTAG, &dn);
356	if (err != 0)
357		return;
358
359	rw_enter(&dn->dn_struct_rwlock, RW_READER);
360	if (dn->dn_datablkshift) {
361		int blkshift = dn->dn_datablkshift;
362		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
363		    P2ALIGN(offset, 1<<blkshift)) >> blkshift;
364	} else {
365		nblks = (offset < dn->dn_datablksz);
366	}
367
368	if (nblks != 0) {
369		blkid = dbuf_whichblock(dn, offset);
370		for (i = 0; i < nblks; i++)
371			dbuf_prefetch(dn, blkid+i);
372	}
373
374	rw_exit(&dn->dn_struct_rwlock);
375
376	dnode_rele(dn, FTAG);
377}
378
379/*
380 * Get the next "chunk" of file data to free.  We traverse the file from
381 * the end so that the file gets shorter over time (if we crashes in the
382 * middle, this will leave us in a better state).  We find allocated file
383 * data by simply searching the allocated level 1 indirects.
384 */
385static int
386get_next_chunk(dnode_t *dn, uint64_t *start, uint64_t limit)
387{
388	uint64_t len = *start - limit;
389	uint64_t blkcnt = 0;
390	uint64_t maxblks = DMU_MAX_ACCESS / (1ULL << (dn->dn_indblkshift + 1));
391	uint64_t iblkrange =
392	    dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
393
394	ASSERT(limit <= *start);
395
396	if (len <= iblkrange * maxblks) {
397		*start = limit;
398		return (0);
399	}
400	ASSERT(ISP2(iblkrange));
401
402	while (*start > limit && blkcnt < maxblks) {
403		int err;
404
405		/* find next allocated L1 indirect */
406		err = dnode_next_offset(dn,
407		    DNODE_FIND_BACKWARDS, start, 2, 1, 0);
408
409		/* if there are no more, then we are done */
410		if (err == ESRCH) {
411			*start = limit;
412			return (0);
413		} else if (err) {
414			return (err);
415		}
416		blkcnt += 1;
417
418		/* reset offset to end of "next" block back */
419		*start = P2ALIGN(*start, iblkrange);
420		if (*start <= limit)
421			*start = limit;
422		else
423			*start -= 1;
424	}
425	return (0);
426}
427
428static int
429dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
430    uint64_t length, boolean_t free_dnode)
431{
432	dmu_tx_t *tx;
433	uint64_t object_size, start, end, len;
434	boolean_t trunc = (length == DMU_OBJECT_END);
435	int align, err;
436
437	align = 1 << dn->dn_datablkshift;
438	ASSERT(align > 0);
439	object_size = align == 1 ? dn->dn_datablksz :
440	    (dn->dn_maxblkid + 1) << dn->dn_datablkshift;
441
442	end = offset + length;
443	if (trunc || end > object_size)
444		end = object_size;
445	if (end <= offset)
446		return (0);
447	length = end - offset;
448
449	while (length) {
450		start = end;
451		/* assert(offset <= start) */
452		err = get_next_chunk(dn, &start, offset);
453		if (err)
454			return (err);
455		len = trunc ? DMU_OBJECT_END : end - start;
456
457		tx = dmu_tx_create(os);
458		dmu_tx_hold_free(tx, dn->dn_object, start, len);
459		err = dmu_tx_assign(tx, TXG_WAIT);
460		if (err) {
461			dmu_tx_abort(tx);
462			return (err);
463		}
464
465		dnode_free_range(dn, start, trunc ? -1 : len, tx);
466
467		if (start == 0 && free_dnode) {
468			ASSERT(trunc);
469			dnode_free(dn, tx);
470		}
471
472		length -= end - start;
473
474		dmu_tx_commit(tx);
475		end = start;
476	}
477	return (0);
478}
479
480int
481dmu_free_long_range(objset_t *os, uint64_t object,
482    uint64_t offset, uint64_t length)
483{
484	dnode_t *dn;
485	int err;
486
487	err = dnode_hold(os, object, FTAG, &dn);
488	if (err != 0)
489		return (err);
490	err = dmu_free_long_range_impl(os, dn, offset, length, FALSE);
491	dnode_rele(dn, FTAG);
492	return (err);
493}
494
495int
496dmu_free_object(objset_t *os, uint64_t object)
497{
498	dnode_t *dn;
499	dmu_tx_t *tx;
500	int err;
501
502	err = dnode_hold_impl(os, object, DNODE_MUST_BE_ALLOCATED,
503	    FTAG, &dn);
504	if (err != 0)
505		return (err);
506	if (dn->dn_nlevels == 1) {
507		tx = dmu_tx_create(os);
508		dmu_tx_hold_bonus(tx, object);
509		dmu_tx_hold_free(tx, dn->dn_object, 0, DMU_OBJECT_END);
510		err = dmu_tx_assign(tx, TXG_WAIT);
511		if (err == 0) {
512			dnode_free_range(dn, 0, DMU_OBJECT_END, tx);
513			dnode_free(dn, tx);
514			dmu_tx_commit(tx);
515		} else {
516			dmu_tx_abort(tx);
517		}
518	} else {
519		err = dmu_free_long_range_impl(os, dn, 0, DMU_OBJECT_END, TRUE);
520	}
521	dnode_rele(dn, FTAG);
522	return (err);
523}
524
525int
526dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
527    uint64_t size, dmu_tx_t *tx)
528{
529	dnode_t *dn;
530	int err = dnode_hold(os, object, FTAG, &dn);
531	if (err)
532		return (err);
533	ASSERT(offset < UINT64_MAX);
534	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
535	dnode_free_range(dn, offset, size, tx);
536	dnode_rele(dn, FTAG);
537	return (0);
538}
539
540int
541dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
542    void *buf, uint32_t flags)
543{
544	dnode_t *dn;
545	dmu_buf_t **dbp;
546	int numbufs, err;
547
548	err = dnode_hold(os, object, FTAG, &dn);
549	if (err)
550		return (err);
551
552	/*
553	 * Deal with odd block sizes, where there can't be data past the first
554	 * block.  If we ever do the tail block optimization, we will need to
555	 * handle that here as well.
556	 */
557	if (dn->dn_maxblkid == 0) {
558		int newsz = offset > dn->dn_datablksz ? 0 :
559		    MIN(size, dn->dn_datablksz - offset);
560		bzero((char *)buf + newsz, size - newsz);
561		size = newsz;
562	}
563
564	while (size > 0) {
565		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
566		int i;
567
568		/*
569		 * NB: we could do this block-at-a-time, but it's nice
570		 * to be reading in parallel.
571		 */
572		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
573		    TRUE, FTAG, &numbufs, &dbp, flags);
574		if (err)
575			break;
576
577		for (i = 0; i < numbufs; i++) {
578			int tocpy;
579			int bufoff;
580			dmu_buf_t *db = dbp[i];
581
582			ASSERT(size > 0);
583
584			bufoff = offset - db->db_offset;
585			tocpy = (int)MIN(db->db_size - bufoff, size);
586
587			bcopy((char *)db->db_data + bufoff, buf, tocpy);
588
589			offset += tocpy;
590			size -= tocpy;
591			buf = (char *)buf + tocpy;
592		}
593		dmu_buf_rele_array(dbp, numbufs, FTAG);
594	}
595	dnode_rele(dn, FTAG);
596	return (err);
597}
598
599void
600dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
601    const void *buf, dmu_tx_t *tx)
602{
603	dmu_buf_t **dbp;
604	int numbufs, i;
605
606	if (size == 0)
607		return;
608
609	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
610	    FALSE, FTAG, &numbufs, &dbp));
611
612	for (i = 0; i < numbufs; i++) {
613		int tocpy;
614		int bufoff;
615		dmu_buf_t *db = dbp[i];
616
617		ASSERT(size > 0);
618
619		bufoff = offset - db->db_offset;
620		tocpy = (int)MIN(db->db_size - bufoff, size);
621
622		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
623
624		if (tocpy == db->db_size)
625			dmu_buf_will_fill(db, tx);
626		else
627			dmu_buf_will_dirty(db, tx);
628
629		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
630
631		if (tocpy == db->db_size)
632			dmu_buf_fill_done(db, tx);
633
634		offset += tocpy;
635		size -= tocpy;
636		buf = (char *)buf + tocpy;
637	}
638	dmu_buf_rele_array(dbp, numbufs, FTAG);
639}
640
641void
642dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
643    dmu_tx_t *tx)
644{
645	dmu_buf_t **dbp;
646	int numbufs, i;
647
648	if (size == 0)
649		return;
650
651	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
652	    FALSE, FTAG, &numbufs, &dbp));
653
654	for (i = 0; i < numbufs; i++) {
655		dmu_buf_t *db = dbp[i];
656
657		dmu_buf_will_not_fill(db, tx);
658	}
659	dmu_buf_rele_array(dbp, numbufs, FTAG);
660}
661
662#ifdef _KERNEL
663int
664dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
665{
666	dmu_buf_t **dbp;
667	int numbufs, i, err;
668
669	/*
670	 * NB: we could do this block-at-a-time, but it's nice
671	 * to be reading in parallel.
672	 */
673	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
674	    &numbufs, &dbp);
675	if (err)
676		return (err);
677
678	for (i = 0; i < numbufs; i++) {
679		int tocpy;
680		int bufoff;
681		dmu_buf_t *db = dbp[i];
682
683		ASSERT(size > 0);
684
685		bufoff = uio->uio_loffset - db->db_offset;
686		tocpy = (int)MIN(db->db_size - bufoff, size);
687
688		err = uiomove((char *)db->db_data + bufoff, tocpy,
689		    UIO_READ, uio);
690		if (err)
691			break;
692
693		size -= tocpy;
694	}
695	dmu_buf_rele_array(dbp, numbufs, FTAG);
696
697	return (err);
698}
699
700int
701dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
702    dmu_tx_t *tx)
703{
704	dmu_buf_t **dbp;
705	int numbufs, i;
706	int err = 0;
707
708	if (size == 0)
709		return (0);
710
711	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size,
712	    FALSE, FTAG, &numbufs, &dbp);
713	if (err)
714		return (err);
715
716	for (i = 0; i < numbufs; i++) {
717		int tocpy;
718		int bufoff;
719		dmu_buf_t *db = dbp[i];
720
721		ASSERT(size > 0);
722
723		bufoff = uio->uio_loffset - db->db_offset;
724		tocpy = (int)MIN(db->db_size - bufoff, size);
725
726		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
727
728		if (tocpy == db->db_size)
729			dmu_buf_will_fill(db, tx);
730		else
731			dmu_buf_will_dirty(db, tx);
732
733		/*
734		 * XXX uiomove could block forever (eg. nfs-backed
735		 * pages).  There needs to be a uiolockdown() function
736		 * to lock the pages in memory, so that uiomove won't
737		 * block.
738		 */
739		err = uiomove((char *)db->db_data + bufoff, tocpy,
740		    UIO_WRITE, uio);
741
742		if (tocpy == db->db_size)
743			dmu_buf_fill_done(db, tx);
744
745		if (err)
746			break;
747
748		size -= tocpy;
749	}
750	dmu_buf_rele_array(dbp, numbufs, FTAG);
751	return (err);
752}
753
754int
755dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
756    page_t *pp, dmu_tx_t *tx)
757{
758	dmu_buf_t **dbp;
759	int numbufs, i;
760	int err;
761
762	if (size == 0)
763		return (0);
764
765	err = dmu_buf_hold_array(os, object, offset, size,
766	    FALSE, FTAG, &numbufs, &dbp);
767	if (err)
768		return (err);
769
770	for (i = 0; i < numbufs; i++) {
771		int tocpy, copied, thiscpy;
772		int bufoff;
773		dmu_buf_t *db = dbp[i];
774		caddr_t va;
775
776		ASSERT(size > 0);
777		ASSERT3U(db->db_size, >=, PAGESIZE);
778
779		bufoff = offset - db->db_offset;
780		tocpy = (int)MIN(db->db_size - bufoff, size);
781
782		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
783
784		if (tocpy == db->db_size)
785			dmu_buf_will_fill(db, tx);
786		else
787			dmu_buf_will_dirty(db, tx);
788
789		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
790			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
791			thiscpy = MIN(PAGESIZE, tocpy - copied);
792			va = zfs_map_page(pp, S_READ);
793			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
794			zfs_unmap_page(pp, va);
795			pp = pp->p_next;
796			bufoff += PAGESIZE;
797		}
798
799		if (tocpy == db->db_size)
800			dmu_buf_fill_done(db, tx);
801
802		offset += tocpy;
803		size -= tocpy;
804	}
805	dmu_buf_rele_array(dbp, numbufs, FTAG);
806	return (err);
807}
808#endif
809
810/*
811 * Allocate a loaned anonymous arc buffer.
812 */
813arc_buf_t *
814dmu_request_arcbuf(dmu_buf_t *handle, int size)
815{
816	dnode_t *dn = ((dmu_buf_impl_t *)handle)->db_dnode;
817
818	return (arc_loan_buf(dn->dn_objset->os_spa, size));
819}
820
821/*
822 * Free a loaned arc buffer.
823 */
824void
825dmu_return_arcbuf(arc_buf_t *buf)
826{
827	arc_return_buf(buf, FTAG);
828	VERIFY(arc_buf_remove_ref(buf, FTAG) == 1);
829}
830
831/*
832 * When possible directly assign passed loaned arc buffer to a dbuf.
833 * If this is not possible copy the contents of passed arc buf via
834 * dmu_write().
835 */
836void
837dmu_assign_arcbuf(dmu_buf_t *handle, uint64_t offset, arc_buf_t *buf,
838    dmu_tx_t *tx)
839{
840	dnode_t *dn = ((dmu_buf_impl_t *)handle)->db_dnode;
841	dmu_buf_impl_t *db;
842	uint32_t blksz = (uint32_t)arc_buf_size(buf);
843	uint64_t blkid;
844
845	rw_enter(&dn->dn_struct_rwlock, RW_READER);
846	blkid = dbuf_whichblock(dn, offset);
847	VERIFY((db = dbuf_hold(dn, blkid, FTAG)) != NULL);
848	rw_exit(&dn->dn_struct_rwlock);
849
850	if (offset == db->db.db_offset && blksz == db->db.db_size) {
851		dbuf_assign_arcbuf(db, buf, tx);
852		dbuf_rele(db, FTAG);
853	} else {
854		dbuf_rele(db, FTAG);
855		dmu_write(dn->dn_objset, dn->dn_object, offset, blksz,
856		    buf->b_data, tx);
857		dmu_return_arcbuf(buf);
858	}
859}
860
861typedef struct {
862	dbuf_dirty_record_t	*dr;
863	dmu_sync_cb_t		*done;
864	void			*arg;
865} dmu_sync_arg_t;
866
867/* ARGSUSED */
868static void
869dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
870{
871	blkptr_t *bp = zio->io_bp;
872	dmu_sync_arg_t *in = varg;
873	dbuf_dirty_record_t *dr = in->dr;
874	dmu_buf_impl_t *db = dr->dr_dbuf;
875
876	mutex_enter(&db->db_mtx);
877	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
878
879	if (!BP_IS_HOLE(bp)) {
880		ASSERT(BP_GET_TYPE(bp) == db->db_dnode->dn_type);
881		ASSERT(BP_GET_LEVEL(bp) == 0);
882		bp->blk_fill = 1;
883		dr->dt.dl.dr_overridden_by = *zio->io_bp;
884	} else {
885		dr->dt.dl.dr_overridden_by = *zio->io_bp;
886		/*
887		 * dmu_sync() can compress a block of zeros to a null blkptr
888		 * but the block size still needs to be passed through to replay
889		 */
890		BP_SET_LSIZE(bp, db->db.db_size);
891	}
892	dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
893	mutex_exit(&db->db_mtx);
894}
895
896/* ARGSUSED */
897static void
898dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
899{
900	dmu_sync_arg_t *in = varg;
901	dbuf_dirty_record_t *dr = in->dr;
902	dmu_buf_impl_t *db = dr->dr_dbuf;
903	dmu_sync_cb_t *done = in->done;
904
905	cv_broadcast(&db->db_changed);
906	if (done)
907		done(&(db->db), in->arg);
908
909	kmem_free(in, sizeof (dmu_sync_arg_t));
910}
911
912/*
913 * Intent log support: sync the block associated with db to disk.
914 * N.B. and XXX: the caller is responsible for making sure that the
915 * data isn't changing while dmu_sync() is writing it.
916 *
917 * Return values:
918 *
919 *	EEXIST: this txg has already been synced, so there's nothing to to.
920 *		The caller should not log the write.
921 *
922 *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
923 *		The caller should not log the write.
924 *
925 *	EALREADY: this block is already in the process of being synced.
926 *		The caller should track its progress (somehow).
927 *
928 *	EINPROGRESS: the IO has been initiated.
929 *		The caller should log this blkptr in the callback.
930 *
931 *	0: completed.  Sets *bp to the blkptr just written.
932 *		The caller should log this blkptr immediately.
933 */
934int
935dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
936    blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
937{
938	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
939	objset_t *os = db->db_objset;
940	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
941	tx_state_t *tx = &dp->dp_tx;
942	dbuf_dirty_record_t *dr;
943	dmu_sync_arg_t *in;
944	zbookmark_t zb;
945	writeprops_t wp = { 0 };
946	zio_t *zio;
947	int err;
948
949	ASSERT(BP_IS_HOLE(bp));
950	ASSERT(txg != 0);
951
952	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
953	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
954
955	/*
956	 * XXX - would be nice if we could do this without suspending...
957	 */
958	txg_suspend(dp);
959
960	/*
961	 * If this txg already synced, there's nothing to do.
962	 */
963	if (txg <= tx->tx_synced_txg) {
964		txg_resume(dp);
965		/*
966		 * If we're running ziltest, we need the blkptr regardless.
967		 */
968		if (txg > spa_freeze_txg(dp->dp_spa)) {
969			/* if db_blkptr == NULL, this was an empty write */
970			if (db->db_blkptr)
971				*bp = *db->db_blkptr; /* structure assignment */
972			return (0);
973		}
974		return (EEXIST);
975	}
976
977	mutex_enter(&db->db_mtx);
978
979	if (txg == tx->tx_syncing_txg) {
980		while (db->db_data_pending) {
981			/*
982			 * IO is in-progress.  Wait for it to finish.
983			 * XXX - would be nice to be able to somehow "attach"
984			 * this zio to the parent zio passed in.
985			 */
986			cv_wait(&db->db_changed, &db->db_mtx);
987			if (!db->db_data_pending &&
988			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
989				/*
990				 * IO was compressed away
991				 */
992				*bp = *db->db_blkptr; /* structure assignment */
993				mutex_exit(&db->db_mtx);
994				txg_resume(dp);
995				return (0);
996			}
997			ASSERT(db->db_data_pending ||
998			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
999		}
1000
1001		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
1002			/*
1003			 * IO is already completed.
1004			 */
1005			*bp = *db->db_blkptr; /* structure assignment */
1006			mutex_exit(&db->db_mtx);
1007			txg_resume(dp);
1008			return (0);
1009		}
1010	}
1011
1012	dr = db->db_last_dirty;
1013	while (dr && dr->dr_txg > txg)
1014		dr = dr->dr_next;
1015	if (dr == NULL || dr->dr_txg < txg) {
1016		/*
1017		 * This dbuf isn't dirty, must have been free_range'd.
1018		 * There's no need to log writes to freed blocks, so we're done.
1019		 */
1020		mutex_exit(&db->db_mtx);
1021		txg_resume(dp);
1022		return (ENOENT);
1023	}
1024
1025	ASSERT(dr->dr_txg == txg);
1026	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
1027		/*
1028		 * We have already issued a sync write for this buffer.
1029		 */
1030		mutex_exit(&db->db_mtx);
1031		txg_resume(dp);
1032		return (EALREADY);
1033	} else if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
1034		/*
1035		 * This buffer has already been synced.  It could not
1036		 * have been dirtied since, or we would have cleared the state.
1037		 */
1038		*bp = dr->dt.dl.dr_overridden_by; /* structure assignment */
1039		mutex_exit(&db->db_mtx);
1040		txg_resume(dp);
1041		return (0);
1042	}
1043
1044	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
1045	in = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1046	in->dr = dr;
1047	in->done = done;
1048	in->arg = arg;
1049	mutex_exit(&db->db_mtx);
1050	txg_resume(dp);
1051
1052	zb.zb_objset = os->os_dsl_dataset->ds_object;
1053	zb.zb_object = db->db.db_object;
1054	zb.zb_level = db->db_level;
1055	zb.zb_blkid = db->db_blkid;
1056
1057	wp.wp_type = db->db_dnode->dn_type;
1058	wp.wp_level = db->db_level;
1059	wp.wp_copies = os->os_copies;
1060	wp.wp_dnchecksum = db->db_dnode->dn_checksum;
1061	wp.wp_oschecksum = os->os_checksum;
1062	wp.wp_dncompress = db->db_dnode->dn_compress;
1063	wp.wp_oscompress = os->os_compress;
1064
1065	ASSERT(BP_IS_HOLE(bp));
1066
1067	zio = arc_write(pio, os->os_spa, &wp, DBUF_IS_L2CACHEABLE(db),
1068	    txg, bp, dr->dt.dl.dr_data, dmu_sync_ready, dmu_sync_done, in,
1069	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
1070	if (pio) {
1071		zio_nowait(zio);
1072		err = EINPROGRESS;
1073	} else {
1074		err = zio_wait(zio);
1075		ASSERT(err == 0);
1076	}
1077	return (err);
1078}
1079
1080int
1081dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1082	dmu_tx_t *tx)
1083{
1084	dnode_t *dn;
1085	int err;
1086
1087	err = dnode_hold(os, object, FTAG, &dn);
1088	if (err)
1089		return (err);
1090	err = dnode_set_blksz(dn, size, ibs, tx);
1091	dnode_rele(dn, FTAG);
1092	return (err);
1093}
1094
1095void
1096dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1097	dmu_tx_t *tx)
1098{
1099	dnode_t *dn;
1100
1101	/* XXX assumes dnode_hold will not get an i/o error */
1102	(void) dnode_hold(os, object, FTAG, &dn);
1103	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1104	dn->dn_checksum = checksum;
1105	dnode_setdirty(dn, tx);
1106	dnode_rele(dn, FTAG);
1107}
1108
1109void
1110dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1111	dmu_tx_t *tx)
1112{
1113	dnode_t *dn;
1114
1115	/* XXX assumes dnode_hold will not get an i/o error */
1116	(void) dnode_hold(os, object, FTAG, &dn);
1117	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1118	dn->dn_compress = compress;
1119	dnode_setdirty(dn, tx);
1120	dnode_rele(dn, FTAG);
1121}
1122
1123int
1124dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1125{
1126	dnode_t *dn;
1127	int i, err;
1128
1129	err = dnode_hold(os, object, FTAG, &dn);
1130	if (err)
1131		return (err);
1132	/*
1133	 * Sync any current changes before
1134	 * we go trundling through the block pointers.
1135	 */
1136	for (i = 0; i < TXG_SIZE; i++) {
1137		if (list_link_active(&dn->dn_dirty_link[i]))
1138			break;
1139	}
1140	if (i != TXG_SIZE) {
1141		dnode_rele(dn, FTAG);
1142		txg_wait_synced(dmu_objset_pool(os), 0);
1143		err = dnode_hold(os, object, FTAG, &dn);
1144		if (err)
1145			return (err);
1146	}
1147
1148	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
1149	dnode_rele(dn, FTAG);
1150
1151	return (err);
1152}
1153
1154void
1155dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1156{
1157	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1158	mutex_enter(&dn->dn_mtx);
1159
1160	doi->doi_data_block_size = dn->dn_datablksz;
1161	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1162	    1ULL << dn->dn_indblkshift : 0;
1163	doi->doi_indirection = dn->dn_nlevels;
1164	doi->doi_checksum = dn->dn_checksum;
1165	doi->doi_compress = dn->dn_compress;
1166	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
1167	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
1168	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
1169	doi->doi_type = dn->dn_type;
1170	doi->doi_bonus_size = dn->dn_bonuslen;
1171	doi->doi_bonus_type = dn->dn_bonustype;
1172
1173	mutex_exit(&dn->dn_mtx);
1174	rw_exit(&dn->dn_struct_rwlock);
1175}
1176
1177/*
1178 * Get information on a DMU object.
1179 * If doi is NULL, just indicates whether the object exists.
1180 */
1181int
1182dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1183{
1184	dnode_t *dn;
1185	int err = dnode_hold(os, object, FTAG, &dn);
1186
1187	if (err)
1188		return (err);
1189
1190	if (doi != NULL)
1191		dmu_object_info_from_dnode(dn, doi);
1192
1193	dnode_rele(dn, FTAG);
1194	return (0);
1195}
1196
1197/*
1198 * As above, but faster; can be used when you have a held dbuf in hand.
1199 */
1200void
1201dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1202{
1203	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1204}
1205
1206/*
1207 * Faster still when you only care about the size.
1208 * This is specifically optimized for zfs_getattr().
1209 */
1210void
1211dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1212{
1213	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1214
1215	*blksize = dn->dn_datablksz;
1216	/* add 1 for dnode space */
1217	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
1218	    SPA_MINBLOCKSHIFT) + 1;
1219}
1220
1221void
1222byteswap_uint64_array(void *vbuf, size_t size)
1223{
1224	uint64_t *buf = vbuf;
1225	size_t count = size >> 3;
1226	int i;
1227
1228	ASSERT((size & 7) == 0);
1229
1230	for (i = 0; i < count; i++)
1231		buf[i] = BSWAP_64(buf[i]);
1232}
1233
1234void
1235byteswap_uint32_array(void *vbuf, size_t size)
1236{
1237	uint32_t *buf = vbuf;
1238	size_t count = size >> 2;
1239	int i;
1240
1241	ASSERT((size & 3) == 0);
1242
1243	for (i = 0; i < count; i++)
1244		buf[i] = BSWAP_32(buf[i]);
1245}
1246
1247void
1248byteswap_uint16_array(void *vbuf, size_t size)
1249{
1250	uint16_t *buf = vbuf;
1251	size_t count = size >> 1;
1252	int i;
1253
1254	ASSERT((size & 1) == 0);
1255
1256	for (i = 0; i < count; i++)
1257		buf[i] = BSWAP_16(buf[i]);
1258}
1259
1260/* ARGSUSED */
1261void
1262byteswap_uint8_array(void *vbuf, size_t size)
1263{
1264}
1265
1266void
1267dmu_init(void)
1268{
1269	dbuf_init();
1270	dnode_init();
1271	zfetch_init();
1272	arc_init();
1273	l2arc_init();
1274}
1275
1276void
1277dmu_fini(void)
1278{
1279	arc_fini();
1280	zfetch_fini();
1281	dnode_fini();
1282	dbuf_fini();
1283	l2arc_fini();
1284}
1285