xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu.c (revision 0a586cea3ceec7e5e50e7e54c745082a7a333ac2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/dmu.h>
27 #include <sys/dmu_impl.h>
28 #include <sys/dmu_tx.h>
29 #include <sys/dbuf.h>
30 #include <sys/dnode.h>
31 #include <sys/zfs_context.h>
32 #include <sys/dmu_objset.h>
33 #include <sys/dmu_traverse.h>
34 #include <sys/dsl_dataset.h>
35 #include <sys/dsl_dir.h>
36 #include <sys/dsl_pool.h>
37 #include <sys/dsl_synctask.h>
38 #include <sys/dsl_prop.h>
39 #include <sys/dmu_zfetch.h>
40 #include <sys/zfs_ioctl.h>
41 #include <sys/zap.h>
42 #include <sys/zio_checksum.h>
43 #include <sys/sa.h>
44 #ifdef _KERNEL
45 #include <sys/vmsystm.h>
46 #include <sys/zfs_znode.h>
47 #endif
48 
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
51 	{	zap_byteswap,		TRUE,	"object directory"	},
52 	{	byteswap_uint64_array,	TRUE,	"object array"		},
53 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
54 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
55 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
56 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
58 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
59 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
60 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
61 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
62 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
63 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
64 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
65 	{	zap_byteswap,		TRUE,	"DSL props"		},
66 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
67 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
68 	{	zfs_oldacl_byteswap,	TRUE,	"ZFS V0 ACL"		},
69 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
70 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
71 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
72 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
73 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
74 	{	zap_byteswap,		TRUE,	"zvol prop"		},
75 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
76 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
77 	{	zap_byteswap,		TRUE,	"other ZAP"		},
78 	{	zap_byteswap,		TRUE,	"persistent error log"	},
79 	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
80 	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
81 	{	zap_byteswap,		TRUE,	"Pool properties"	},
82 	{	zap_byteswap,		TRUE,	"DSL permissions"	},
83 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
84 	{	byteswap_uint8_array,	TRUE,	"ZFS SYSACL"		},
85 	{	byteswap_uint8_array,	TRUE,	"FUID table"		},
86 	{	byteswap_uint64_array,	TRUE,	"FUID table size"	},
87 	{	zap_byteswap,		TRUE,	"DSL dataset next clones"},
88 	{	zap_byteswap,		TRUE,	"scrub work queue"	},
89 	{	zap_byteswap,		TRUE,	"ZFS user/group used"	},
90 	{	zap_byteswap,		TRUE,	"ZFS user/group quota"	},
91 	{	zap_byteswap,		TRUE,	"snapshot refcount tags"},
92 	{	zap_byteswap,		TRUE,	"DDT ZAP algorithm"	},
93 	{	zap_byteswap,		TRUE,	"DDT statistics"	},
94 	{	byteswap_uint8_array,	TRUE,	"System attributes"	},
95 	{	zap_byteswap,		TRUE,	"SA master node"	},
96 	{	zap_byteswap,		TRUE,	"SA attr registration"	},
97 	{	zap_byteswap,		TRUE,	"SA attr layouts"	}, };
98 
99 int
100 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
101     void *tag, dmu_buf_t **dbp)
102 {
103 	dnode_t *dn;
104 	uint64_t blkid;
105 	dmu_buf_impl_t *db;
106 	int err;
107 
108 	err = dnode_hold(os, object, FTAG, &dn);
109 	if (err)
110 		return (err);
111 	blkid = dbuf_whichblock(dn, offset);
112 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
113 	db = dbuf_hold(dn, blkid, tag);
114 	rw_exit(&dn->dn_struct_rwlock);
115 	if (db == NULL) {
116 		err = EIO;
117 	} else {
118 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
119 		if (err) {
120 			dbuf_rele(db, tag);
121 			db = NULL;
122 		}
123 	}
124 
125 	dnode_rele(dn, FTAG);
126 	*dbp = &db->db;
127 	return (err);
128 }
129 
130 int
131 dmu_bonus_max(void)
132 {
133 	return (DN_MAX_BONUSLEN);
134 }
135 
136 int
137 dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
138 {
139 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
140 
141 	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
142 		return (EINVAL);
143 	if (newsize < 0 || newsize > db->db_size)
144 		return (EINVAL);
145 	dnode_setbonuslen(dn, newsize, tx);
146 	return (0);
147 }
148 
149 int
150 dmu_set_bonustype(dmu_buf_t *db, dmu_object_type_t type, dmu_tx_t *tx)
151 {
152 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
153 
154 	if (type > DMU_OT_NUMTYPES)
155 		return (EINVAL);
156 
157 	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
158 		return (EINVAL);
159 
160 	dnode_setbonus_type(dn, type, tx);
161 	return (0);
162 }
163 
164 int
165 dmu_rm_spill(objset_t *os, uint64_t object, dmu_tx_t *tx)
166 {
167 	dnode_t *dn;
168 	int error;
169 
170 	error = dnode_hold(os, object, FTAG, &dn);
171 	dbuf_rm_spill(dn, tx);
172 	dnode_rele(dn, FTAG);
173 	return (error);
174 }
175 
176 /*
177  * returns ENOENT, EIO, or 0.
178  */
179 int
180 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
181 {
182 	dnode_t *dn;
183 	dmu_buf_impl_t *db;
184 	int error;
185 
186 	error = dnode_hold(os, object, FTAG, &dn);
187 	if (error)
188 		return (error);
189 
190 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
191 	if (dn->dn_bonus == NULL) {
192 		rw_exit(&dn->dn_struct_rwlock);
193 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
194 		if (dn->dn_bonus == NULL)
195 			dbuf_create_bonus(dn);
196 	}
197 	db = dn->dn_bonus;
198 	rw_exit(&dn->dn_struct_rwlock);
199 
200 	/* as long as the bonus buf is held, the dnode will be held */
201 	if (refcount_add(&db->db_holds, tag) == 1)
202 		VERIFY(dnode_add_ref(dn, db));
203 
204 	dnode_rele(dn, FTAG);
205 
206 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
207 
208 	*dbp = &db->db;
209 	return (0);
210 }
211 
212 /*
213  * returns ENOENT, EIO, or 0.
214  *
215  * This interface will allocate a blank spill dbuf when a spill blk
216  * doesn't already exist on the dnode.
217  *
218  * if you only want to find an already existing spill db, then
219  * dmu_spill_hold_existing() should be used.
220  */
221 int
222 dmu_spill_hold_by_dnode(dnode_t *dn, uint32_t flags, void *tag, dmu_buf_t **dbp)
223 {
224 	dmu_buf_impl_t *db = NULL;
225 	int err;
226 
227 	if ((flags & DB_RF_HAVESTRUCT) == 0)
228 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
229 
230 	db = dbuf_hold(dn, DMU_SPILL_BLKID, tag);
231 
232 	if ((flags & DB_RF_HAVESTRUCT) == 0)
233 		rw_exit(&dn->dn_struct_rwlock);
234 
235 	ASSERT(db != NULL);
236 	err = dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | flags);
237 	*dbp = &db->db;
238 	return (err);
239 }
240 
241 int
242 dmu_spill_hold_existing(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
243 {
244 	dnode_t *dn = ((dmu_buf_impl_t *)bonus)->db_dnode;
245 	int err;
246 
247 	if (spa_version(dn->dn_objset->os_spa) < SPA_VERSION_SA)
248 		return (EINVAL);
249 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
250 
251 	if (!dn->dn_have_spill) {
252 		rw_exit(&dn->dn_struct_rwlock);
253 		return (ENOENT);
254 	}
255 	err = dmu_spill_hold_by_dnode(dn, DB_RF_HAVESTRUCT, tag, dbp);
256 	rw_exit(&dn->dn_struct_rwlock);
257 	return (err);
258 }
259 
260 int
261 dmu_spill_hold_by_bonus(dmu_buf_t *bonus, void *tag, dmu_buf_t **dbp)
262 {
263 	return (dmu_spill_hold_by_dnode(((dmu_buf_impl_t *)bonus)->db_dnode,
264 	    0, tag, dbp));
265 }
266 
267 /*
268  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
269  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
270  * and can induce severe lock contention when writing to several files
271  * whose dnodes are in the same block.
272  */
273 static int
274 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
275     int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
276 {
277 	dsl_pool_t *dp = NULL;
278 	dmu_buf_t **dbp;
279 	uint64_t blkid, nblks, i;
280 	uint32_t dbuf_flags;
281 	int err;
282 	zio_t *zio;
283 	hrtime_t start;
284 
285 	ASSERT(length <= DMU_MAX_ACCESS);
286 
287 	dbuf_flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT | DB_RF_HAVESTRUCT;
288 	if (flags & DMU_READ_NO_PREFETCH || length > zfetch_array_rd_sz)
289 		dbuf_flags |= DB_RF_NOPREFETCH;
290 
291 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
292 	if (dn->dn_datablkshift) {
293 		int blkshift = dn->dn_datablkshift;
294 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
295 		    P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
296 	} else {
297 		if (offset + length > dn->dn_datablksz) {
298 			zfs_panic_recover("zfs: accessing past end of object "
299 			    "%llx/%llx (size=%u access=%llu+%llu)",
300 			    (longlong_t)dn->dn_objset->
301 			    os_dsl_dataset->ds_object,
302 			    (longlong_t)dn->dn_object, dn->dn_datablksz,
303 			    (longlong_t)offset, (longlong_t)length);
304 			rw_exit(&dn->dn_struct_rwlock);
305 			return (EIO);
306 		}
307 		nblks = 1;
308 	}
309 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
310 
311 	if (dn->dn_objset->os_dsl_dataset)
312 		dp = dn->dn_objset->os_dsl_dataset->ds_dir->dd_pool;
313 	if (dp && dsl_pool_sync_context(dp))
314 		start = gethrtime();
315 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
316 	blkid = dbuf_whichblock(dn, offset);
317 	for (i = 0; i < nblks; i++) {
318 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
319 		if (db == NULL) {
320 			rw_exit(&dn->dn_struct_rwlock);
321 			dmu_buf_rele_array(dbp, nblks, tag);
322 			zio_nowait(zio);
323 			return (EIO);
324 		}
325 		/* initiate async i/o */
326 		if (read) {
327 			(void) dbuf_read(db, zio, dbuf_flags);
328 		}
329 		dbp[i] = &db->db;
330 	}
331 	rw_exit(&dn->dn_struct_rwlock);
332 
333 	/* wait for async i/o */
334 	err = zio_wait(zio);
335 	/* track read overhead when we are in sync context */
336 	if (dp && dsl_pool_sync_context(dp))
337 		dp->dp_read_overhead += gethrtime() - start;
338 	if (err) {
339 		dmu_buf_rele_array(dbp, nblks, tag);
340 		return (err);
341 	}
342 
343 	/* wait for other io to complete */
344 	if (read) {
345 		for (i = 0; i < nblks; i++) {
346 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
347 			mutex_enter(&db->db_mtx);
348 			while (db->db_state == DB_READ ||
349 			    db->db_state == DB_FILL)
350 				cv_wait(&db->db_changed, &db->db_mtx);
351 			if (db->db_state == DB_UNCACHED)
352 				err = EIO;
353 			mutex_exit(&db->db_mtx);
354 			if (err) {
355 				dmu_buf_rele_array(dbp, nblks, tag);
356 				return (err);
357 			}
358 		}
359 	}
360 
361 	*numbufsp = nblks;
362 	*dbpp = dbp;
363 	return (0);
364 }
365 
366 static int
367 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
368     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
369 {
370 	dnode_t *dn;
371 	int err;
372 
373 	err = dnode_hold(os, object, FTAG, &dn);
374 	if (err)
375 		return (err);
376 
377 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
378 	    numbufsp, dbpp, DMU_READ_PREFETCH);
379 
380 	dnode_rele(dn, FTAG);
381 
382 	return (err);
383 }
384 
385 int
386 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
387     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
388 {
389 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
390 	int err;
391 
392 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
393 	    numbufsp, dbpp, DMU_READ_PREFETCH);
394 
395 	return (err);
396 }
397 
398 void
399 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
400 {
401 	int i;
402 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
403 
404 	if (numbufs == 0)
405 		return;
406 
407 	for (i = 0; i < numbufs; i++) {
408 		if (dbp[i])
409 			dbuf_rele(dbp[i], tag);
410 	}
411 
412 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
413 }
414 
415 void
416 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
417 {
418 	dnode_t *dn;
419 	uint64_t blkid;
420 	int nblks, i, err;
421 
422 	if (zfs_prefetch_disable)
423 		return;
424 
425 	if (len == 0) {  /* they're interested in the bonus buffer */
426 		dn = os->os_meta_dnode;
427 
428 		if (object == 0 || object >= DN_MAX_OBJECT)
429 			return;
430 
431 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
432 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
433 		dbuf_prefetch(dn, blkid);
434 		rw_exit(&dn->dn_struct_rwlock);
435 		return;
436 	}
437 
438 	/*
439 	 * XXX - Note, if the dnode for the requested object is not
440 	 * already cached, we will do a *synchronous* read in the
441 	 * dnode_hold() call.  The same is true for any indirects.
442 	 */
443 	err = dnode_hold(os, object, FTAG, &dn);
444 	if (err != 0)
445 		return;
446 
447 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
448 	if (dn->dn_datablkshift) {
449 		int blkshift = dn->dn_datablkshift;
450 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
451 		    P2ALIGN(offset, 1<<blkshift)) >> blkshift;
452 	} else {
453 		nblks = (offset < dn->dn_datablksz);
454 	}
455 
456 	if (nblks != 0) {
457 		blkid = dbuf_whichblock(dn, offset);
458 		for (i = 0; i < nblks; i++)
459 			dbuf_prefetch(dn, blkid+i);
460 	}
461 
462 	rw_exit(&dn->dn_struct_rwlock);
463 
464 	dnode_rele(dn, FTAG);
465 }
466 
467 /*
468  * Get the next "chunk" of file data to free.  We traverse the file from
469  * the end so that the file gets shorter over time (if we crashes in the
470  * middle, this will leave us in a better state).  We find allocated file
471  * data by simply searching the allocated level 1 indirects.
472  */
473 static int
474 get_next_chunk(dnode_t *dn, uint64_t *start, uint64_t limit)
475 {
476 	uint64_t len = *start - limit;
477 	uint64_t blkcnt = 0;
478 	uint64_t maxblks = DMU_MAX_ACCESS / (1ULL << (dn->dn_indblkshift + 1));
479 	uint64_t iblkrange =
480 	    dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
481 
482 	ASSERT(limit <= *start);
483 
484 	if (len <= iblkrange * maxblks) {
485 		*start = limit;
486 		return (0);
487 	}
488 	ASSERT(ISP2(iblkrange));
489 
490 	while (*start > limit && blkcnt < maxblks) {
491 		int err;
492 
493 		/* find next allocated L1 indirect */
494 		err = dnode_next_offset(dn,
495 		    DNODE_FIND_BACKWARDS, start, 2, 1, 0);
496 
497 		/* if there are no more, then we are done */
498 		if (err == ESRCH) {
499 			*start = limit;
500 			return (0);
501 		} else if (err) {
502 			return (err);
503 		}
504 		blkcnt += 1;
505 
506 		/* reset offset to end of "next" block back */
507 		*start = P2ALIGN(*start, iblkrange);
508 		if (*start <= limit)
509 			*start = limit;
510 		else
511 			*start -= 1;
512 	}
513 	return (0);
514 }
515 
516 static int
517 dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
518     uint64_t length, boolean_t free_dnode)
519 {
520 	dmu_tx_t *tx;
521 	uint64_t object_size, start, end, len;
522 	boolean_t trunc = (length == DMU_OBJECT_END);
523 	int align, err;
524 
525 	align = 1 << dn->dn_datablkshift;
526 	ASSERT(align > 0);
527 	object_size = align == 1 ? dn->dn_datablksz :
528 	    (dn->dn_maxblkid + 1) << dn->dn_datablkshift;
529 
530 	end = offset + length;
531 	if (trunc || end > object_size)
532 		end = object_size;
533 	if (end <= offset)
534 		return (0);
535 	length = end - offset;
536 
537 	while (length) {
538 		start = end;
539 		/* assert(offset <= start) */
540 		err = get_next_chunk(dn, &start, offset);
541 		if (err)
542 			return (err);
543 		len = trunc ? DMU_OBJECT_END : end - start;
544 
545 		tx = dmu_tx_create(os);
546 		dmu_tx_hold_free(tx, dn->dn_object, start, len);
547 		err = dmu_tx_assign(tx, TXG_WAIT);
548 		if (err) {
549 			dmu_tx_abort(tx);
550 			return (err);
551 		}
552 
553 		dnode_free_range(dn, start, trunc ? -1 : len, tx);
554 
555 		if (start == 0 && free_dnode) {
556 			ASSERT(trunc);
557 			dnode_free(dn, tx);
558 		}
559 
560 		length -= end - start;
561 
562 		dmu_tx_commit(tx);
563 		end = start;
564 	}
565 	return (0);
566 }
567 
568 int
569 dmu_free_long_range(objset_t *os, uint64_t object,
570     uint64_t offset, uint64_t length)
571 {
572 	dnode_t *dn;
573 	int err;
574 
575 	err = dnode_hold(os, object, FTAG, &dn);
576 	if (err != 0)
577 		return (err);
578 	err = dmu_free_long_range_impl(os, dn, offset, length, FALSE);
579 	dnode_rele(dn, FTAG);
580 	return (err);
581 }
582 
583 int
584 dmu_free_object(objset_t *os, uint64_t object)
585 {
586 	dnode_t *dn;
587 	dmu_tx_t *tx;
588 	int err;
589 
590 	err = dnode_hold_impl(os, object, DNODE_MUST_BE_ALLOCATED,
591 	    FTAG, &dn);
592 	if (err != 0)
593 		return (err);
594 	if (dn->dn_nlevels == 1) {
595 		tx = dmu_tx_create(os);
596 		dmu_tx_hold_bonus(tx, object);
597 		dmu_tx_hold_free(tx, dn->dn_object, 0, DMU_OBJECT_END);
598 		err = dmu_tx_assign(tx, TXG_WAIT);
599 		if (err == 0) {
600 			dnode_free_range(dn, 0, DMU_OBJECT_END, tx);
601 			dnode_free(dn, tx);
602 			dmu_tx_commit(tx);
603 		} else {
604 			dmu_tx_abort(tx);
605 		}
606 	} else {
607 		err = dmu_free_long_range_impl(os, dn, 0, DMU_OBJECT_END, TRUE);
608 	}
609 	dnode_rele(dn, FTAG);
610 	return (err);
611 }
612 
613 int
614 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
615     uint64_t size, dmu_tx_t *tx)
616 {
617 	dnode_t *dn;
618 	int err = dnode_hold(os, object, FTAG, &dn);
619 	if (err)
620 		return (err);
621 	ASSERT(offset < UINT64_MAX);
622 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
623 	dnode_free_range(dn, offset, size, tx);
624 	dnode_rele(dn, FTAG);
625 	return (0);
626 }
627 
628 int
629 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
630     void *buf, uint32_t flags)
631 {
632 	dnode_t *dn;
633 	dmu_buf_t **dbp;
634 	int numbufs, err;
635 
636 	err = dnode_hold(os, object, FTAG, &dn);
637 	if (err)
638 		return (err);
639 
640 	/*
641 	 * Deal with odd block sizes, where there can't be data past the first
642 	 * block.  If we ever do the tail block optimization, we will need to
643 	 * handle that here as well.
644 	 */
645 	if (dn->dn_maxblkid == 0) {
646 		int newsz = offset > dn->dn_datablksz ? 0 :
647 		    MIN(size, dn->dn_datablksz - offset);
648 		bzero((char *)buf + newsz, size - newsz);
649 		size = newsz;
650 	}
651 
652 	while (size > 0) {
653 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
654 		int i;
655 
656 		/*
657 		 * NB: we could do this block-at-a-time, but it's nice
658 		 * to be reading in parallel.
659 		 */
660 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
661 		    TRUE, FTAG, &numbufs, &dbp, flags);
662 		if (err)
663 			break;
664 
665 		for (i = 0; i < numbufs; i++) {
666 			int tocpy;
667 			int bufoff;
668 			dmu_buf_t *db = dbp[i];
669 
670 			ASSERT(size > 0);
671 
672 			bufoff = offset - db->db_offset;
673 			tocpy = (int)MIN(db->db_size - bufoff, size);
674 
675 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
676 
677 			offset += tocpy;
678 			size -= tocpy;
679 			buf = (char *)buf + tocpy;
680 		}
681 		dmu_buf_rele_array(dbp, numbufs, FTAG);
682 	}
683 	dnode_rele(dn, FTAG);
684 	return (err);
685 }
686 
687 void
688 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
689     const void *buf, dmu_tx_t *tx)
690 {
691 	dmu_buf_t **dbp;
692 	int numbufs, i;
693 
694 	if (size == 0)
695 		return;
696 
697 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
698 	    FALSE, FTAG, &numbufs, &dbp));
699 
700 	for (i = 0; i < numbufs; i++) {
701 		int tocpy;
702 		int bufoff;
703 		dmu_buf_t *db = dbp[i];
704 
705 		ASSERT(size > 0);
706 
707 		bufoff = offset - db->db_offset;
708 		tocpy = (int)MIN(db->db_size - bufoff, size);
709 
710 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
711 
712 		if (tocpy == db->db_size)
713 			dmu_buf_will_fill(db, tx);
714 		else
715 			dmu_buf_will_dirty(db, tx);
716 
717 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
718 
719 		if (tocpy == db->db_size)
720 			dmu_buf_fill_done(db, tx);
721 
722 		offset += tocpy;
723 		size -= tocpy;
724 		buf = (char *)buf + tocpy;
725 	}
726 	dmu_buf_rele_array(dbp, numbufs, FTAG);
727 }
728 
729 void
730 dmu_prealloc(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
731     dmu_tx_t *tx)
732 {
733 	dmu_buf_t **dbp;
734 	int numbufs, i;
735 
736 	if (size == 0)
737 		return;
738 
739 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
740 	    FALSE, FTAG, &numbufs, &dbp));
741 
742 	for (i = 0; i < numbufs; i++) {
743 		dmu_buf_t *db = dbp[i];
744 
745 		dmu_buf_will_not_fill(db, tx);
746 	}
747 	dmu_buf_rele_array(dbp, numbufs, FTAG);
748 }
749 
750 /*
751  * DMU support for xuio
752  */
753 kstat_t *xuio_ksp = NULL;
754 
755 int
756 dmu_xuio_init(xuio_t *xuio, int nblk)
757 {
758 	dmu_xuio_t *priv;
759 	uio_t *uio = &xuio->xu_uio;
760 
761 	uio->uio_iovcnt = nblk;
762 	uio->uio_iov = kmem_zalloc(nblk * sizeof (iovec_t), KM_SLEEP);
763 
764 	priv = kmem_zalloc(sizeof (dmu_xuio_t), KM_SLEEP);
765 	priv->cnt = nblk;
766 	priv->bufs = kmem_zalloc(nblk * sizeof (arc_buf_t *), KM_SLEEP);
767 	priv->iovp = uio->uio_iov;
768 	XUIO_XUZC_PRIV(xuio) = priv;
769 
770 	if (XUIO_XUZC_RW(xuio) == UIO_READ)
771 		XUIOSTAT_INCR(xuiostat_onloan_rbuf, nblk);
772 	else
773 		XUIOSTAT_INCR(xuiostat_onloan_wbuf, nblk);
774 
775 	return (0);
776 }
777 
778 void
779 dmu_xuio_fini(xuio_t *xuio)
780 {
781 	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
782 	int nblk = priv->cnt;
783 
784 	kmem_free(priv->iovp, nblk * sizeof (iovec_t));
785 	kmem_free(priv->bufs, nblk * sizeof (arc_buf_t *));
786 	kmem_free(priv, sizeof (dmu_xuio_t));
787 
788 	if (XUIO_XUZC_RW(xuio) == UIO_READ)
789 		XUIOSTAT_INCR(xuiostat_onloan_rbuf, -nblk);
790 	else
791 		XUIOSTAT_INCR(xuiostat_onloan_wbuf, -nblk);
792 }
793 
794 /*
795  * Initialize iov[priv->next] and priv->bufs[priv->next] with { off, n, abuf }
796  * and increase priv->next by 1.
797  */
798 int
799 dmu_xuio_add(xuio_t *xuio, arc_buf_t *abuf, offset_t off, size_t n)
800 {
801 	struct iovec *iov;
802 	uio_t *uio = &xuio->xu_uio;
803 	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
804 	int i = priv->next++;
805 
806 	ASSERT(i < priv->cnt);
807 	ASSERT(off + n <= arc_buf_size(abuf));
808 	iov = uio->uio_iov + i;
809 	iov->iov_base = (char *)abuf->b_data + off;
810 	iov->iov_len = n;
811 	priv->bufs[i] = abuf;
812 	return (0);
813 }
814 
815 int
816 dmu_xuio_cnt(xuio_t *xuio)
817 {
818 	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
819 	return (priv->cnt);
820 }
821 
822 arc_buf_t *
823 dmu_xuio_arcbuf(xuio_t *xuio, int i)
824 {
825 	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
826 
827 	ASSERT(i < priv->cnt);
828 	return (priv->bufs[i]);
829 }
830 
831 void
832 dmu_xuio_clear(xuio_t *xuio, int i)
833 {
834 	dmu_xuio_t *priv = XUIO_XUZC_PRIV(xuio);
835 
836 	ASSERT(i < priv->cnt);
837 	priv->bufs[i] = NULL;
838 }
839 
840 static void
841 xuio_stat_init(void)
842 {
843 	xuio_ksp = kstat_create("zfs", 0, "xuio_stats", "misc",
844 	    KSTAT_TYPE_NAMED, sizeof (xuio_stats) / sizeof (kstat_named_t),
845 	    KSTAT_FLAG_VIRTUAL);
846 	if (xuio_ksp != NULL) {
847 		xuio_ksp->ks_data = &xuio_stats;
848 		kstat_install(xuio_ksp);
849 	}
850 }
851 
852 static void
853 xuio_stat_fini(void)
854 {
855 	if (xuio_ksp != NULL) {
856 		kstat_delete(xuio_ksp);
857 		xuio_ksp = NULL;
858 	}
859 }
860 
861 void
862 xuio_stat_wbuf_copied()
863 {
864 	XUIOSTAT_BUMP(xuiostat_wbuf_copied);
865 }
866 
867 void
868 xuio_stat_wbuf_nocopy()
869 {
870 	XUIOSTAT_BUMP(xuiostat_wbuf_nocopy);
871 }
872 
873 #ifdef _KERNEL
874 int
875 dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
876 {
877 	dmu_buf_t **dbp;
878 	int numbufs, i, err;
879 	xuio_t *xuio = NULL;
880 
881 	/*
882 	 * NB: we could do this block-at-a-time, but it's nice
883 	 * to be reading in parallel.
884 	 */
885 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
886 	    &numbufs, &dbp);
887 	if (err)
888 		return (err);
889 
890 	if (uio->uio_extflg == UIO_XUIO)
891 		xuio = (xuio_t *)uio;
892 
893 	for (i = 0; i < numbufs; i++) {
894 		int tocpy;
895 		int bufoff;
896 		dmu_buf_t *db = dbp[i];
897 
898 		ASSERT(size > 0);
899 
900 		bufoff = uio->uio_loffset - db->db_offset;
901 		tocpy = (int)MIN(db->db_size - bufoff, size);
902 
903 		if (xuio) {
904 			dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
905 			arc_buf_t *dbuf_abuf = dbi->db_buf;
906 			arc_buf_t *abuf = dbuf_loan_arcbuf(dbi);
907 			err = dmu_xuio_add(xuio, abuf, bufoff, tocpy);
908 			if (!err) {
909 				uio->uio_resid -= tocpy;
910 				uio->uio_loffset += tocpy;
911 			}
912 
913 			if (abuf == dbuf_abuf)
914 				XUIOSTAT_BUMP(xuiostat_rbuf_nocopy);
915 			else
916 				XUIOSTAT_BUMP(xuiostat_rbuf_copied);
917 		} else {
918 			err = uiomove((char *)db->db_data + bufoff, tocpy,
919 			    UIO_READ, uio);
920 		}
921 		if (err)
922 			break;
923 
924 		size -= tocpy;
925 	}
926 	dmu_buf_rele_array(dbp, numbufs, FTAG);
927 
928 	return (err);
929 }
930 
931 int
932 dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
933     dmu_tx_t *tx)
934 {
935 	dmu_buf_t **dbp;
936 	int numbufs, i;
937 	int err = 0;
938 
939 	if (size == 0)
940 		return (0);
941 
942 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size,
943 	    FALSE, FTAG, &numbufs, &dbp);
944 	if (err)
945 		return (err);
946 
947 	for (i = 0; i < numbufs; i++) {
948 		int tocpy;
949 		int bufoff;
950 		dmu_buf_t *db = dbp[i];
951 
952 		ASSERT(size > 0);
953 
954 		bufoff = uio->uio_loffset - db->db_offset;
955 		tocpy = (int)MIN(db->db_size - bufoff, size);
956 
957 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
958 
959 		if (tocpy == db->db_size)
960 			dmu_buf_will_fill(db, tx);
961 		else
962 			dmu_buf_will_dirty(db, tx);
963 
964 		/*
965 		 * XXX uiomove could block forever (eg. nfs-backed
966 		 * pages).  There needs to be a uiolockdown() function
967 		 * to lock the pages in memory, so that uiomove won't
968 		 * block.
969 		 */
970 		err = uiomove((char *)db->db_data + bufoff, tocpy,
971 		    UIO_WRITE, uio);
972 
973 		if (tocpy == db->db_size)
974 			dmu_buf_fill_done(db, tx);
975 
976 		if (err)
977 			break;
978 
979 		size -= tocpy;
980 	}
981 	dmu_buf_rele_array(dbp, numbufs, FTAG);
982 	return (err);
983 }
984 
985 int
986 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
987     page_t *pp, dmu_tx_t *tx)
988 {
989 	dmu_buf_t **dbp;
990 	int numbufs, i;
991 	int err;
992 
993 	if (size == 0)
994 		return (0);
995 
996 	err = dmu_buf_hold_array(os, object, offset, size,
997 	    FALSE, FTAG, &numbufs, &dbp);
998 	if (err)
999 		return (err);
1000 
1001 	for (i = 0; i < numbufs; i++) {
1002 		int tocpy, copied, thiscpy;
1003 		int bufoff;
1004 		dmu_buf_t *db = dbp[i];
1005 		caddr_t va;
1006 
1007 		ASSERT(size > 0);
1008 		ASSERT3U(db->db_size, >=, PAGESIZE);
1009 
1010 		bufoff = offset - db->db_offset;
1011 		tocpy = (int)MIN(db->db_size - bufoff, size);
1012 
1013 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
1014 
1015 		if (tocpy == db->db_size)
1016 			dmu_buf_will_fill(db, tx);
1017 		else
1018 			dmu_buf_will_dirty(db, tx);
1019 
1020 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
1021 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
1022 			thiscpy = MIN(PAGESIZE, tocpy - copied);
1023 			va = zfs_map_page(pp, S_READ);
1024 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
1025 			zfs_unmap_page(pp, va);
1026 			pp = pp->p_next;
1027 			bufoff += PAGESIZE;
1028 		}
1029 
1030 		if (tocpy == db->db_size)
1031 			dmu_buf_fill_done(db, tx);
1032 
1033 		offset += tocpy;
1034 		size -= tocpy;
1035 	}
1036 	dmu_buf_rele_array(dbp, numbufs, FTAG);
1037 	return (err);
1038 }
1039 #endif
1040 
1041 /*
1042  * Allocate a loaned anonymous arc buffer.
1043  */
1044 arc_buf_t *
1045 dmu_request_arcbuf(dmu_buf_t *handle, int size)
1046 {
1047 	dnode_t *dn = ((dmu_buf_impl_t *)handle)->db_dnode;
1048 
1049 	return (arc_loan_buf(dn->dn_objset->os_spa, size));
1050 }
1051 
1052 /*
1053  * Free a loaned arc buffer.
1054  */
1055 void
1056 dmu_return_arcbuf(arc_buf_t *buf)
1057 {
1058 	arc_return_buf(buf, FTAG);
1059 	VERIFY(arc_buf_remove_ref(buf, FTAG) == 1);
1060 }
1061 
1062 /*
1063  * When possible directly assign passed loaned arc buffer to a dbuf.
1064  * If this is not possible copy the contents of passed arc buf via
1065  * dmu_write().
1066  */
1067 void
1068 dmu_assign_arcbuf(dmu_buf_t *handle, uint64_t offset, arc_buf_t *buf,
1069     dmu_tx_t *tx)
1070 {
1071 	dnode_t *dn = ((dmu_buf_impl_t *)handle)->db_dnode;
1072 	dmu_buf_impl_t *db;
1073 	uint32_t blksz = (uint32_t)arc_buf_size(buf);
1074 	uint64_t blkid;
1075 
1076 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1077 	blkid = dbuf_whichblock(dn, offset);
1078 	VERIFY((db = dbuf_hold(dn, blkid, FTAG)) != NULL);
1079 	rw_exit(&dn->dn_struct_rwlock);
1080 
1081 	if (offset == db->db.db_offset && blksz == db->db.db_size) {
1082 		dbuf_assign_arcbuf(db, buf, tx);
1083 		dbuf_rele(db, FTAG);
1084 	} else {
1085 		dbuf_rele(db, FTAG);
1086 		dmu_write(dn->dn_objset, dn->dn_object, offset, blksz,
1087 		    buf->b_data, tx);
1088 		dmu_return_arcbuf(buf);
1089 		XUIOSTAT_BUMP(xuiostat_wbuf_copied);
1090 	}
1091 }
1092 
1093 typedef struct {
1094 	dbuf_dirty_record_t	*dsa_dr;
1095 	dmu_sync_cb_t		*dsa_done;
1096 	zgd_t			*dsa_zgd;
1097 	dmu_tx_t		*dsa_tx;
1098 } dmu_sync_arg_t;
1099 
1100 /* ARGSUSED */
1101 static void
1102 dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
1103 {
1104 	dmu_sync_arg_t *dsa = varg;
1105 	dmu_buf_t *db = dsa->dsa_zgd->zgd_db;
1106 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1107 	blkptr_t *bp = zio->io_bp;
1108 
1109 	if (zio->io_error == 0) {
1110 		if (BP_IS_HOLE(bp)) {
1111 			/*
1112 			 * A block of zeros may compress to a hole, but the
1113 			 * block size still needs to be known for replay.
1114 			 */
1115 			BP_SET_LSIZE(bp, db->db_size);
1116 		} else {
1117 			ASSERT(BP_GET_TYPE(bp) == dn->dn_type);
1118 			ASSERT(BP_GET_LEVEL(bp) == 0);
1119 			bp->blk_fill = 1;
1120 		}
1121 	}
1122 }
1123 
1124 static void
1125 dmu_sync_late_arrival_ready(zio_t *zio)
1126 {
1127 	dmu_sync_ready(zio, NULL, zio->io_private);
1128 }
1129 
1130 /* ARGSUSED */
1131 static void
1132 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
1133 {
1134 	dmu_sync_arg_t *dsa = varg;
1135 	dbuf_dirty_record_t *dr = dsa->dsa_dr;
1136 	dmu_buf_impl_t *db = dr->dr_dbuf;
1137 
1138 	mutex_enter(&db->db_mtx);
1139 	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
1140 	if (zio->io_error == 0) {
1141 		dr->dt.dl.dr_overridden_by = *zio->io_bp;
1142 		dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
1143 		dr->dt.dl.dr_copies = zio->io_prop.zp_copies;
1144 		if (BP_IS_HOLE(&dr->dt.dl.dr_overridden_by))
1145 			BP_ZERO(&dr->dt.dl.dr_overridden_by);
1146 	} else {
1147 		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
1148 	}
1149 	cv_broadcast(&db->db_changed);
1150 	mutex_exit(&db->db_mtx);
1151 
1152 	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1153 
1154 	kmem_free(dsa, sizeof (*dsa));
1155 }
1156 
1157 static void
1158 dmu_sync_late_arrival_done(zio_t *zio)
1159 {
1160 	blkptr_t *bp = zio->io_bp;
1161 	dmu_sync_arg_t *dsa = zio->io_private;
1162 
1163 	if (zio->io_error == 0 && !BP_IS_HOLE(bp)) {
1164 		ASSERT(zio->io_bp->blk_birth == zio->io_txg);
1165 		ASSERT(zio->io_txg > spa_syncing_txg(zio->io_spa));
1166 		zio_free(zio->io_spa, zio->io_txg, zio->io_bp);
1167 	}
1168 
1169 	dmu_tx_commit(dsa->dsa_tx);
1170 
1171 	dsa->dsa_done(dsa->dsa_zgd, zio->io_error);
1172 
1173 	kmem_free(dsa, sizeof (*dsa));
1174 }
1175 
1176 static int
1177 dmu_sync_late_arrival(zio_t *pio, objset_t *os, dmu_sync_cb_t *done, zgd_t *zgd,
1178     zio_prop_t *zp, zbookmark_t *zb)
1179 {
1180 	dmu_sync_arg_t *dsa;
1181 	dmu_tx_t *tx;
1182 
1183 	tx = dmu_tx_create(os);
1184 	dmu_tx_hold_space(tx, zgd->zgd_db->db_size);
1185 	if (dmu_tx_assign(tx, TXG_WAIT) != 0) {
1186 		dmu_tx_abort(tx);
1187 		return (EIO);	/* Make zl_get_data do txg_waited_synced() */
1188 	}
1189 
1190 	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1191 	dsa->dsa_dr = NULL;
1192 	dsa->dsa_done = done;
1193 	dsa->dsa_zgd = zgd;
1194 	dsa->dsa_tx = tx;
1195 
1196 	zio_nowait(zio_write(pio, os->os_spa, dmu_tx_get_txg(tx), zgd->zgd_bp,
1197 	    zgd->zgd_db->db_data, zgd->zgd_db->db_size, zp,
1198 	    dmu_sync_late_arrival_ready, dmu_sync_late_arrival_done, dsa,
1199 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, zb));
1200 
1201 	return (0);
1202 }
1203 
1204 /*
1205  * Intent log support: sync the block associated with db to disk.
1206  * N.B. and XXX: the caller is responsible for making sure that the
1207  * data isn't changing while dmu_sync() is writing it.
1208  *
1209  * Return values:
1210  *
1211  *	EEXIST: this txg has already been synced, so there's nothing to to.
1212  *		The caller should not log the write.
1213  *
1214  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1215  *		The caller should not log the write.
1216  *
1217  *	EALREADY: this block is already in the process of being synced.
1218  *		The caller should track its progress (somehow).
1219  *
1220  *	EIO: could not do the I/O.
1221  *		The caller should do a txg_wait_synced().
1222  *
1223  *	0: the I/O has been initiated.
1224  *		The caller should log this blkptr in the done callback.
1225  *		It is possible that the I/O will fail, in which case
1226  *		the error will be reported to the done callback and
1227  *		propagated to pio from zio_done().
1228  */
1229 int
1230 dmu_sync(zio_t *pio, uint64_t txg, dmu_sync_cb_t *done, zgd_t *zgd)
1231 {
1232 	blkptr_t *bp = zgd->zgd_bp;
1233 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)zgd->zgd_db;
1234 	objset_t *os = db->db_objset;
1235 	dsl_dataset_t *ds = os->os_dsl_dataset;
1236 	dbuf_dirty_record_t *dr;
1237 	dmu_sync_arg_t *dsa;
1238 	zbookmark_t zb;
1239 	zio_prop_t zp;
1240 
1241 	ASSERT(pio != NULL);
1242 	ASSERT(BP_IS_HOLE(bp));
1243 	ASSERT(txg != 0);
1244 
1245 	SET_BOOKMARK(&zb, ds->ds_object,
1246 	    db->db.db_object, db->db_level, db->db_blkid);
1247 
1248 	dmu_write_policy(os, db->db_dnode, db->db_level, WP_DMU_SYNC, &zp);
1249 
1250 	/*
1251 	 * If we're frozen (running ziltest), we always need to generate a bp.
1252 	 */
1253 	if (txg > spa_freeze_txg(os->os_spa))
1254 		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
1255 
1256 	/*
1257 	 * Grabbing db_mtx now provides a barrier between dbuf_sync_leaf()
1258 	 * and us.  If we determine that this txg is not yet syncing,
1259 	 * but it begins to sync a moment later, that's OK because the
1260 	 * sync thread will block in dbuf_sync_leaf() until we drop db_mtx.
1261 	 */
1262 	mutex_enter(&db->db_mtx);
1263 
1264 	if (txg <= spa_last_synced_txg(os->os_spa)) {
1265 		/*
1266 		 * This txg has already synced.  There's nothing to do.
1267 		 */
1268 		mutex_exit(&db->db_mtx);
1269 		return (EEXIST);
1270 	}
1271 
1272 	if (txg <= spa_syncing_txg(os->os_spa)) {
1273 		/*
1274 		 * This txg is currently syncing, so we can't mess with
1275 		 * the dirty record anymore; just write a new log block.
1276 		 */
1277 		mutex_exit(&db->db_mtx);
1278 		return (dmu_sync_late_arrival(pio, os, done, zgd, &zp, &zb));
1279 	}
1280 
1281 	dr = db->db_last_dirty;
1282 	while (dr && dr->dr_txg != txg)
1283 		dr = dr->dr_next;
1284 
1285 	if (dr == NULL) {
1286 		/*
1287 		 * There's no dr for this dbuf, so it must have been freed.
1288 		 * There's no need to log writes to freed blocks, so we're done.
1289 		 */
1290 		mutex_exit(&db->db_mtx);
1291 		return (ENOENT);
1292 	}
1293 
1294 	ASSERT(dr->dr_txg == txg);
1295 	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC ||
1296 	    dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
1297 		/*
1298 		 * We have already issued a sync write for this buffer,
1299 		 * or this buffer has already been synced.  It could not
1300 		 * have been dirtied since, or we would have cleared the state.
1301 		 */
1302 		mutex_exit(&db->db_mtx);
1303 		return (EALREADY);
1304 	}
1305 
1306 	ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
1307 	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
1308 	mutex_exit(&db->db_mtx);
1309 
1310 	dsa = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
1311 	dsa->dsa_dr = dr;
1312 	dsa->dsa_done = done;
1313 	dsa->dsa_zgd = zgd;
1314 	dsa->dsa_tx = NULL;
1315 
1316 	zio_nowait(arc_write(pio, os->os_spa, txg,
1317 	    bp, dr->dt.dl.dr_data, DBUF_IS_L2CACHEABLE(db), &zp,
1318 	    dmu_sync_ready, dmu_sync_done, dsa,
1319 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, &zb));
1320 
1321 	return (0);
1322 }
1323 
1324 int
1325 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1326 	dmu_tx_t *tx)
1327 {
1328 	dnode_t *dn;
1329 	int err;
1330 
1331 	err = dnode_hold(os, object, FTAG, &dn);
1332 	if (err)
1333 		return (err);
1334 	err = dnode_set_blksz(dn, size, ibs, tx);
1335 	dnode_rele(dn, FTAG);
1336 	return (err);
1337 }
1338 
1339 void
1340 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1341 	dmu_tx_t *tx)
1342 {
1343 	dnode_t *dn;
1344 
1345 	/* XXX assumes dnode_hold will not get an i/o error */
1346 	(void) dnode_hold(os, object, FTAG, &dn);
1347 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1348 	dn->dn_checksum = checksum;
1349 	dnode_setdirty(dn, tx);
1350 	dnode_rele(dn, FTAG);
1351 }
1352 
1353 void
1354 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1355 	dmu_tx_t *tx)
1356 {
1357 	dnode_t *dn;
1358 
1359 	/* XXX assumes dnode_hold will not get an i/o error */
1360 	(void) dnode_hold(os, object, FTAG, &dn);
1361 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1362 	dn->dn_compress = compress;
1363 	dnode_setdirty(dn, tx);
1364 	dnode_rele(dn, FTAG);
1365 }
1366 
1367 int zfs_mdcomp_disable = 0;
1368 
1369 void
1370 dmu_write_policy(objset_t *os, dnode_t *dn, int level, int wp, zio_prop_t *zp)
1371 {
1372 	dmu_object_type_t type = dn ? dn->dn_type : DMU_OT_OBJSET;
1373 	boolean_t ismd = (level > 0 || dmu_ot[type].ot_metadata);
1374 	enum zio_checksum checksum = os->os_checksum;
1375 	enum zio_compress compress = os->os_compress;
1376 	enum zio_checksum dedup_checksum = os->os_dedup_checksum;
1377 	boolean_t dedup;
1378 	boolean_t dedup_verify = os->os_dedup_verify;
1379 	int copies = os->os_copies;
1380 
1381 	/*
1382 	 * Determine checksum setting.
1383 	 */
1384 	if (ismd) {
1385 		/*
1386 		 * Metadata always gets checksummed.  If the data
1387 		 * checksum is multi-bit correctable, and it's not a
1388 		 * ZBT-style checksum, then it's suitable for metadata
1389 		 * as well.  Otherwise, the metadata checksum defaults
1390 		 * to fletcher4.
1391 		 */
1392 		if (zio_checksum_table[checksum].ci_correctable < 1 ||
1393 		    zio_checksum_table[checksum].ci_eck)
1394 			checksum = ZIO_CHECKSUM_FLETCHER_4;
1395 	} else {
1396 		checksum = zio_checksum_select(dn->dn_checksum, checksum);
1397 	}
1398 
1399 	/*
1400 	 * Determine compression setting.
1401 	 */
1402 	if (ismd) {
1403 		/*
1404 		 * XXX -- we should design a compression algorithm
1405 		 * that specializes in arrays of bps.
1406 		 */
1407 		compress = zfs_mdcomp_disable ? ZIO_COMPRESS_EMPTY :
1408 		    ZIO_COMPRESS_LZJB;
1409 	} else {
1410 		compress = zio_compress_select(dn->dn_compress, compress);
1411 	}
1412 
1413 	/*
1414 	 * Determine dedup setting.  If we are in dmu_sync(), we won't
1415 	 * actually dedup now because that's all done in syncing context;
1416 	 * but we do want to use the dedup checkum.  If the checksum is not
1417 	 * strong enough to ensure unique signatures, force dedup_verify.
1418 	 */
1419 	dedup = (!ismd && dedup_checksum != ZIO_CHECKSUM_OFF);
1420 	if (dedup) {
1421 		checksum = dedup_checksum;
1422 		if (!zio_checksum_table[checksum].ci_dedup)
1423 			dedup_verify = 1;
1424 	}
1425 
1426 	if (wp & WP_DMU_SYNC)
1427 		dedup = 0;
1428 
1429 	if (wp & WP_NOFILL) {
1430 		ASSERT(!ismd && level == 0);
1431 		checksum = ZIO_CHECKSUM_OFF;
1432 		compress = ZIO_COMPRESS_OFF;
1433 		dedup = B_FALSE;
1434 	}
1435 
1436 	zp->zp_checksum = checksum;
1437 	zp->zp_compress = compress;
1438 	zp->zp_type = (wp & WP_SPILL) ? dn->dn_bonustype : type;
1439 	zp->zp_level = level;
1440 	zp->zp_copies = MIN(copies + ismd, spa_max_replication(os->os_spa));
1441 	zp->zp_dedup = dedup;
1442 	zp->zp_dedup_verify = dedup && dedup_verify;
1443 }
1444 
1445 int
1446 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1447 {
1448 	dnode_t *dn;
1449 	int i, err;
1450 
1451 	err = dnode_hold(os, object, FTAG, &dn);
1452 	if (err)
1453 		return (err);
1454 	/*
1455 	 * Sync any current changes before
1456 	 * we go trundling through the block pointers.
1457 	 */
1458 	for (i = 0; i < TXG_SIZE; i++) {
1459 		if (list_link_active(&dn->dn_dirty_link[i]))
1460 			break;
1461 	}
1462 	if (i != TXG_SIZE) {
1463 		dnode_rele(dn, FTAG);
1464 		txg_wait_synced(dmu_objset_pool(os), 0);
1465 		err = dnode_hold(os, object, FTAG, &dn);
1466 		if (err)
1467 			return (err);
1468 	}
1469 
1470 	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
1471 	dnode_rele(dn, FTAG);
1472 
1473 	return (err);
1474 }
1475 
1476 void
1477 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1478 {
1479 	dnode_phys_t *dnp;
1480 
1481 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1482 	mutex_enter(&dn->dn_mtx);
1483 
1484 	dnp = dn->dn_phys;
1485 
1486 	doi->doi_data_block_size = dn->dn_datablksz;
1487 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1488 	    1ULL << dn->dn_indblkshift : 0;
1489 	doi->doi_type = dn->dn_type;
1490 	doi->doi_bonus_type = dn->dn_bonustype;
1491 	doi->doi_bonus_size = dn->dn_bonuslen;
1492 	doi->doi_indirection = dn->dn_nlevels;
1493 	doi->doi_checksum = dn->dn_checksum;
1494 	doi->doi_compress = dn->dn_compress;
1495 	doi->doi_physical_blocks_512 = (DN_USED_BYTES(dnp) + 256) >> 9;
1496 	doi->doi_max_offset = (dnp->dn_maxblkid + 1) * dn->dn_datablksz;
1497 	doi->doi_fill_count = 0;
1498 	for (int i = 0; i < dnp->dn_nblkptr; i++)
1499 		doi->doi_fill_count += dnp->dn_blkptr[i].blk_fill;
1500 
1501 	mutex_exit(&dn->dn_mtx);
1502 	rw_exit(&dn->dn_struct_rwlock);
1503 }
1504 
1505 /*
1506  * Get information on a DMU object.
1507  * If doi is NULL, just indicates whether the object exists.
1508  */
1509 int
1510 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1511 {
1512 	dnode_t *dn;
1513 	int err = dnode_hold(os, object, FTAG, &dn);
1514 
1515 	if (err)
1516 		return (err);
1517 
1518 	if (doi != NULL)
1519 		dmu_object_info_from_dnode(dn, doi);
1520 
1521 	dnode_rele(dn, FTAG);
1522 	return (0);
1523 }
1524 
1525 /*
1526  * As above, but faster; can be used when you have a held dbuf in hand.
1527  */
1528 void
1529 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1530 {
1531 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1532 }
1533 
1534 /*
1535  * Faster still when you only care about the size.
1536  * This is specifically optimized for zfs_getattr().
1537  */
1538 void
1539 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1540 {
1541 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1542 
1543 	*blksize = dn->dn_datablksz;
1544 	/* add 1 for dnode space */
1545 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
1546 	    SPA_MINBLOCKSHIFT) + 1;
1547 }
1548 
1549 void
1550 byteswap_uint64_array(void *vbuf, size_t size)
1551 {
1552 	uint64_t *buf = vbuf;
1553 	size_t count = size >> 3;
1554 	int i;
1555 
1556 	ASSERT((size & 7) == 0);
1557 
1558 	for (i = 0; i < count; i++)
1559 		buf[i] = BSWAP_64(buf[i]);
1560 }
1561 
1562 void
1563 byteswap_uint32_array(void *vbuf, size_t size)
1564 {
1565 	uint32_t *buf = vbuf;
1566 	size_t count = size >> 2;
1567 	int i;
1568 
1569 	ASSERT((size & 3) == 0);
1570 
1571 	for (i = 0; i < count; i++)
1572 		buf[i] = BSWAP_32(buf[i]);
1573 }
1574 
1575 void
1576 byteswap_uint16_array(void *vbuf, size_t size)
1577 {
1578 	uint16_t *buf = vbuf;
1579 	size_t count = size >> 1;
1580 	int i;
1581 
1582 	ASSERT((size & 1) == 0);
1583 
1584 	for (i = 0; i < count; i++)
1585 		buf[i] = BSWAP_16(buf[i]);
1586 }
1587 
1588 /* ARGSUSED */
1589 void
1590 byteswap_uint8_array(void *vbuf, size_t size)
1591 {
1592 }
1593 
1594 void
1595 dmu_init(void)
1596 {
1597 	dbuf_init();
1598 	dnode_init();
1599 	zfetch_init();
1600 	arc_init();
1601 	l2arc_init();
1602 	xuio_stat_init();
1603 	sa_cache_init();
1604 }
1605 
1606 void
1607 dmu_fini(void)
1608 {
1609 	arc_fini();
1610 	zfetch_fini();
1611 	dnode_fini();
1612 	dbuf_fini();
1613 	l2arc_fini();
1614 	xuio_stat_fini();
1615 	sa_cache_fini();
1616 }
1617