xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu_tx.c (revision 347a31bcb38b51837caee115d3979d3a981cc099)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dbuf.h>
31 #include <sys/dmu_tx.h>
32 #include <sys/dmu_objset.h>
33 #include <sys/dsl_dataset.h> /* for dsl_dataset_block_freeable() */
34 #include <sys/dsl_dir.h> /* for dsl_dir_tempreserve_*() */
35 #include <sys/dsl_pool.h>
36 #include <sys/zap_impl.h>	/* for ZAP_BLOCK_SHIFT */
37 #include <sys/spa.h>
38 #include <sys/zfs_context.h>
39 
40 typedef void (*dmu_tx_hold_func_t)(dmu_tx_t *tx, struct dnode *dn,
41     uint64_t arg1, uint64_t arg2);
42 
43 #ifdef ZFS_DEBUG
44 int dmu_use_tx_debug_bufs = 1;
45 #endif
46 
47 dmu_tx_t *
48 dmu_tx_create_ds(dsl_dir_t *dd)
49 {
50 	dmu_tx_t *tx = kmem_zalloc(sizeof (dmu_tx_t), KM_SLEEP);
51 	tx->tx_dir = dd;
52 	if (dd)
53 		tx->tx_pool = dd->dd_pool;
54 	list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
55 	    offsetof(dmu_tx_hold_t, dth_node));
56 	refcount_create(&tx->tx_space_written);
57 	refcount_create(&tx->tx_space_freed);
58 	return (tx);
59 }
60 
61 dmu_tx_t *
62 dmu_tx_create(objset_t *os)
63 {
64 	dmu_tx_t *tx = dmu_tx_create_ds(os->os->os_dsl_dataset->ds_dir);
65 	tx->tx_objset = os;
66 	tx->tx_lastsnap_txg = dsl_dataset_prev_snap_txg(os->os->os_dsl_dataset);
67 	return (tx);
68 }
69 
70 dmu_tx_t *
71 dmu_tx_create_assigned(struct dsl_pool *dp, uint64_t txg)
72 {
73 	dmu_tx_t *tx = dmu_tx_create_ds(NULL);
74 
75 	ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
76 	tx->tx_pool = dp;
77 	tx->tx_txg = txg;
78 	tx->tx_anyobj = TRUE;
79 
80 	return (tx);
81 }
82 
83 int
84 dmu_tx_is_syncing(dmu_tx_t *tx)
85 {
86 	return (tx->tx_anyobj);
87 }
88 
89 int
90 dmu_tx_private_ok(dmu_tx_t *tx)
91 {
92 	return (tx->tx_anyobj);
93 }
94 
95 static void
96 dmu_tx_hold_object_impl(dmu_tx_t *tx, objset_t *os, uint64_t object,
97     enum dmu_tx_hold_type type, dmu_tx_hold_func_t func,
98     uint64_t arg1, uint64_t arg2)
99 {
100 	dmu_tx_hold_t *dth;
101 	dnode_t *dn = NULL;
102 	int err;
103 
104 	if (object != DMU_NEW_OBJECT) {
105 		err = dnode_hold(os->os, object, tx, &dn);
106 		if (err) {
107 			tx->tx_err = err;
108 			return;
109 		}
110 
111 		if (err == 0 && tx->tx_txg != 0) {
112 			mutex_enter(&dn->dn_mtx);
113 			/*
114 			 * dn->dn_assigned_txg == tx->tx_txg doesn't pose a
115 			 * problem, but there's no way for it to happen (for
116 			 * now, at least).
117 			 */
118 			ASSERT(dn->dn_assigned_txg == 0);
119 			ASSERT(dn->dn_assigned_tx == NULL);
120 			dn->dn_assigned_txg = tx->tx_txg;
121 			dn->dn_assigned_tx = tx;
122 			(void) refcount_add(&dn->dn_tx_holds, tx);
123 			mutex_exit(&dn->dn_mtx);
124 		}
125 	}
126 
127 	dth = kmem_zalloc(sizeof (dmu_tx_hold_t), KM_SLEEP);
128 	dth->dth_dnode = dn;
129 	dth->dth_type = type;
130 	dth->dth_arg1 = arg1;
131 	dth->dth_arg2 = arg2;
132 	list_insert_tail(&tx->tx_holds, dth);
133 
134 	if (func)
135 		func(tx, dn, arg1, arg2);
136 }
137 
138 void
139 dmu_tx_add_new_object(dmu_tx_t *tx, objset_t *os, uint64_t object)
140 {
141 	/*
142 	 * If we're syncing, they can manipulate any object anyhow, and
143 	 * the hold on the dnode_t can cause problems.
144 	 */
145 	if (!dmu_tx_is_syncing(tx)) {
146 		dmu_tx_hold_object_impl(tx, os, object, THT_NEWOBJECT,
147 		    NULL, 0, 0);
148 	}
149 }
150 
151 static int
152 dmu_tx_check_ioerr(zio_t *zio, dnode_t *dn, int level, uint64_t blkid)
153 {
154 	int err;
155 	dmu_buf_impl_t *db;
156 
157 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
158 	db = dbuf_hold_level(dn, level, blkid, FTAG);
159 	rw_exit(&dn->dn_struct_rwlock);
160 	if (db == NULL)
161 		return (EIO);
162 	err = dbuf_read(db, zio, DB_RF_CANFAIL);
163 	dbuf_rele(db, FTAG);
164 	return (err);
165 }
166 
167 /* ARGSUSED */
168 static void
169 dmu_tx_count_write(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
170 {
171 	uint64_t start, end, i, space;
172 	int min_bs, max_bs, min_ibs, max_ibs, epbs, bits;
173 
174 	if (len == 0)
175 		return;
176 
177 	min_bs = SPA_MINBLOCKSHIFT;
178 	max_bs = SPA_MAXBLOCKSHIFT;
179 	min_ibs = DN_MIN_INDBLKSHIFT;
180 	max_ibs = DN_MAX_INDBLKSHIFT;
181 
182 	/*
183 	 * For i/o error checking, read the first and last level-0
184 	 * blocks, and all the level-1 blocks.  We needn't do this on
185 	 * the meta-dnode, because we've already read it in.
186 	 */
187 
188 	if (dn && dn->dn_object != DMU_META_DNODE_OBJECT) {
189 		int err;
190 
191 		if (dn->dn_maxblkid == 0) {
192 			err = dmu_tx_check_ioerr(NULL, dn, 0, 0);
193 			if (err) {
194 				tx->tx_err = err;
195 				return;
196 			}
197 		} else {
198 			zio_t *zio = zio_root(tx->tx_pool->dp_spa,
199 			    NULL, NULL, ZIO_FLAG_CANFAIL);
200 
201 			/* first level-0 block */
202 			start = off/dn->dn_datablksz;
203 			err = dmu_tx_check_ioerr(zio, dn, 0, start);
204 			if (err) {
205 				tx->tx_err = err;
206 				return;
207 			}
208 
209 			/* last level-0 block */
210 			end = (off+len)/dn->dn_datablksz;
211 			if (end != start) {
212 				err = dmu_tx_check_ioerr(zio, dn, 0, end);
213 				if (err) {
214 					tx->tx_err = err;
215 					return;
216 				}
217 			}
218 
219 			/* level-1 blocks */
220 			if (dn->dn_nlevels > 1) {
221 				start >>= dn->dn_indblkshift - SPA_BLKPTRSHIFT;
222 				end >>= dn->dn_indblkshift - SPA_BLKPTRSHIFT;
223 				for (i = start+1; i < end; i++) {
224 					err = dmu_tx_check_ioerr(zio, dn, 1, i);
225 					if (err) {
226 						tx->tx_err = err;
227 						return;
228 					}
229 				}
230 			}
231 
232 			err = zio_wait(zio);
233 			if (err) {
234 				tx->tx_err = err;
235 				return;
236 			}
237 		}
238 	}
239 
240 	/*
241 	 * If there's more than one block, the blocksize can't change,
242 	 * so we can make a more precise estimate.  Alternatively,
243 	 * if the dnode's ibs is larger than max_ibs, always use that.
244 	 * This ensures that if we reduce DN_MAX_INDBLKSHIFT,
245 	 * the code will still work correctly on existing pools.
246 	 */
247 	if (dn && (dn->dn_maxblkid != 0 || dn->dn_indblkshift > max_ibs)) {
248 		min_ibs = max_ibs = dn->dn_indblkshift;
249 		if (dn->dn_datablkshift != 0)
250 			min_bs = max_bs = dn->dn_datablkshift;
251 	}
252 
253 	/*
254 	 * 'end' is the last thing we will access, not one past.
255 	 * This way we won't overflow when accessing the last byte.
256 	 */
257 	start = P2ALIGN(off, 1ULL << max_bs);
258 	end = P2ROUNDUP(off + len, 1ULL << max_bs) - 1;
259 	space = end - start + 1;
260 
261 	start >>= min_bs;
262 	end >>= min_bs;
263 
264 	epbs = min_ibs - SPA_BLKPTRSHIFT;
265 
266 	/*
267 	 * The object contains at most 2^(64 - min_bs) blocks,
268 	 * and each indirect level maps 2^epbs.
269 	 */
270 	for (bits = 64 - min_bs; bits >= 0; bits -= epbs) {
271 		start >>= epbs;
272 		end >>= epbs;
273 		/*
274 		 * If we increase the number of levels of indirection,
275 		 * we'll need new blkid=0 indirect blocks.  If start == 0,
276 		 * we're already accounting for that blocks; and if end == 0,
277 		 * we can't increase the number of levels beyond that.
278 		 */
279 		if (start != 0 && end != 0)
280 			space += 1ULL << max_ibs;
281 		space += (end - start + 1) << max_ibs;
282 	}
283 
284 	ASSERT(space < 2 * DMU_MAX_ACCESS);
285 
286 	tx->tx_space_towrite += space;
287 }
288 
289 static void
290 dmu_tx_count_dnode(dmu_tx_t *tx, dnode_t *dn)
291 {
292 	dnode_t *mdn = tx->tx_objset->os->os_meta_dnode;
293 	uint64_t object = dn ? dn->dn_object : DN_MAX_OBJECT - 1;
294 	uint64_t pre_write_space;
295 
296 	ASSERT(object < DN_MAX_OBJECT);
297 	pre_write_space = tx->tx_space_towrite;
298 	dmu_tx_count_write(tx, mdn, object << DNODE_SHIFT, 1 << DNODE_SHIFT);
299 	if (dn && dn->dn_dbuf->db_blkptr &&
300 	    dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
301 	    dn->dn_dbuf->db_blkptr->blk_birth)) {
302 		tx->tx_space_tooverwrite +=
303 			tx->tx_space_towrite - pre_write_space;
304 		tx->tx_space_towrite = pre_write_space;
305 	}
306 }
307 
308 /* ARGSUSED */
309 static void
310 dmu_tx_hold_write_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
311 {
312 	dmu_tx_count_write(tx, dn, off, len);
313 	dmu_tx_count_dnode(tx, dn);
314 }
315 
316 void
317 dmu_tx_hold_write(dmu_tx_t *tx, uint64_t object, uint64_t off, int len)
318 {
319 	ASSERT(tx->tx_txg == 0);
320 	ASSERT(len < DMU_MAX_ACCESS);
321 	ASSERT(UINT64_MAX - off >= len - 1);
322 
323 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_WRITE,
324 	    dmu_tx_hold_write_impl, off, len);
325 }
326 
327 static void
328 dmu_tx_count_free(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
329 {
330 	uint64_t blkid, nblks;
331 	uint64_t space = 0;
332 	dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
333 	int dirty;
334 
335 	/*
336 	 * We don't need to use any locking to check for dirtyness
337 	 * because it's OK if we get stale data -- the dnode may become
338 	 * dirty immediately after our check anyway.  However, we need
339 	 * the lock to ensure that the link isn't changing while we call
340 	 * list_link_active(), to satisfy its assertions.  This is just
341 	 * a means to avoid the expensive count when we aren't sure we
342 	 * need it.  We need to be able to deal with a dirty dnode.
343 	 */
344 	mutex_enter(&dn->dn_objset->os_lock);
345 	dirty = list_link_active(&dn->dn_dirty_link[0]) |
346 	    list_link_active(&dn->dn_dirty_link[1]) |
347 	    list_link_active(&dn->dn_dirty_link[2]) |
348 	    list_link_active(&dn->dn_dirty_link[3]);
349 	mutex_exit(&dn->dn_objset->os_lock);
350 	if (dn->dn_assigned_tx || dirty)
351 		return;
352 
353 	/*
354 	 * the struct_rwlock protects us against dn_phys->dn_nlevels
355 	 * changing, in case (against all odds) we manage to dirty &
356 	 * sync out the changes after we check for being dirty.
357 	 * also, dbuf_hold_impl() wants us to have the struct_rwlock.
358 	 *
359 	 * It's fine to use dn_datablkshift rather than the dn_phys
360 	 * equivalent because if it is changing, maxblkid==0 and we will
361 	 * bail.
362 	 */
363 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
364 	if (dn->dn_phys->dn_maxblkid == 0) {
365 		if (off == 0 && len >= dn->dn_datablksz) {
366 			blkid = 0;
367 			nblks = 1;
368 		} else {
369 			rw_exit(&dn->dn_struct_rwlock);
370 			return;
371 		}
372 	} else {
373 		blkid = off >> dn->dn_datablkshift;
374 		nblks = (off + len) >> dn->dn_datablkshift;
375 
376 		if (blkid >= dn->dn_phys->dn_maxblkid) {
377 			rw_exit(&dn->dn_struct_rwlock);
378 			return;
379 		}
380 		if (blkid + nblks > dn->dn_phys->dn_maxblkid)
381 			nblks = dn->dn_phys->dn_maxblkid - blkid;
382 
383 		/* don't bother after 128,000 blocks */
384 		nblks = MIN(nblks, 128*1024);
385 	}
386 
387 	if (dn->dn_phys->dn_nlevels == 1) {
388 		int i;
389 		for (i = 0; i < nblks; i++) {
390 			blkptr_t *bp = dn->dn_phys->dn_blkptr;
391 			ASSERT3U(blkid + i, <, dn->dn_phys->dn_nblkptr);
392 			bp += blkid + i;
393 			if (dsl_dataset_block_freeable(ds, bp->blk_birth)) {
394 				dprintf_bp(bp, "can free old%s", "");
395 				space += BP_GET_ASIZE(bp);
396 			}
397 		}
398 		nblks = 0;
399 	}
400 
401 	while (nblks) {
402 		dmu_buf_impl_t *dbuf;
403 		int err, epbs, blkoff, tochk;
404 
405 		epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
406 		blkoff = P2PHASE(blkid, 1<<epbs);
407 		tochk = MIN((1<<epbs) - blkoff, nblks);
408 
409 		err = dbuf_hold_impl(dn, 1, blkid >> epbs, TRUE, FTAG, &dbuf);
410 		if (err == 0) {
411 			int i;
412 			blkptr_t *bp;
413 
414 			err = dbuf_read(dbuf, NULL,
415 			    DB_RF_HAVESTRUCT | DB_RF_CANFAIL);
416 			if (err != 0) {
417 				tx->tx_err = err;
418 				dbuf_rele(dbuf, FTAG);
419 				break;
420 			}
421 
422 			bp = dbuf->db.db_data;
423 			bp += blkoff;
424 
425 			for (i = 0; i < tochk; i++) {
426 				if (dsl_dataset_block_freeable(ds,
427 				    bp[i].blk_birth)) {
428 					dprintf_bp(&bp[i],
429 					    "can free old%s", "");
430 					space += BP_GET_ASIZE(&bp[i]);
431 				}
432 			}
433 			dbuf_rele(dbuf, FTAG);
434 		}
435 		if (err != 0 && err != ENOENT) {
436 			tx->tx_err = err;
437 			break;
438 		}
439 
440 		blkid += tochk;
441 		nblks -= tochk;
442 	}
443 	rw_exit(&dn->dn_struct_rwlock);
444 
445 	tx->tx_space_tofree += space;
446 }
447 
448 static void
449 dmu_tx_hold_free_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
450 {
451 	uint64_t start, end, i;
452 	int err, shift;
453 	zio_t *zio;
454 
455 	/* first block */
456 	if (off != 0 /* || dn->dn_maxblkid == 0 */)
457 		dmu_tx_count_write(tx, dn, off, 1);
458 	/* last block */
459 	if (len != DMU_OBJECT_END)
460 		dmu_tx_count_write(tx, dn, off+len, 1);
461 
462 	if (off >= (dn->dn_maxblkid+1) * dn->dn_datablksz)
463 		return;
464 	if (len == DMU_OBJECT_END)
465 		len = (dn->dn_maxblkid+1) * dn->dn_datablksz - off;
466 
467 	/*
468 	 * For i/o error checking, read the first and last level-0
469 	 * blocks, and all the level-1 blocks.  The above count_write's
470 	 * will take care of the level-0 blocks.
471 	 */
472 	shift = dn->dn_datablkshift + dn->dn_indblkshift - SPA_BLKPTRSHIFT;
473 	start = off >> shift;
474 	end = dn->dn_datablkshift ? ((off+len) >> shift) : 0;
475 
476 	zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
477 	for (i = start+1; i < end; i++) {
478 		uint64_t ibyte = i << shift;
479 		err = dnode_next_offset(dn, FALSE, &ibyte, 2, 1);
480 		i = ibyte >> shift;
481 		if (err == ESRCH)
482 			break;
483 		if (err) {
484 			tx->tx_err = err;
485 			return;
486 		}
487 
488 		err = dmu_tx_check_ioerr(zio, dn, 1, i);
489 		if (err) {
490 			tx->tx_err = err;
491 			return;
492 		}
493 	}
494 	err = zio_wait(zio);
495 	if (err) {
496 		tx->tx_err = err;
497 		return;
498 	}
499 
500 	dmu_tx_count_dnode(tx, dn);
501 	dmu_tx_count_free(tx, dn, off, len);
502 }
503 
504 void
505 dmu_tx_hold_free(dmu_tx_t *tx, uint64_t object, uint64_t off, uint64_t len)
506 {
507 	ASSERT(tx->tx_txg == 0);
508 
509 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_FREE,
510 	    dmu_tx_hold_free_impl, off, len);
511 }
512 
513 /* ARGSUSED */
514 static void
515 dmu_tx_hold_zap_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t add, uint64_t iname)
516 {
517 	uint64_t nblocks;
518 	int epbs, err;
519 	char *name = (char *)(uintptr_t)iname;
520 
521 	dmu_tx_count_dnode(tx, dn);
522 
523 	if (dn == NULL) {
524 		/*
525 		 * We will be able to fit a new object's entries into one leaf
526 		 * block.  So there will be at most 2 blocks total,
527 		 * including the header block.
528 		 */
529 		dmu_tx_count_write(tx, dn, 0, 2 << fzap_default_block_shift);
530 		return;
531 	}
532 
533 	ASSERT3P(dmu_ot[dn->dn_type].ot_byteswap, ==, zap_byteswap);
534 
535 	if (dn->dn_maxblkid == 0 && !add) {
536 		/*
537 		 * If there is only one block  (i.e. this is a micro-zap)
538 		 * and we are not adding anything, the accounting is simple.
539 		 */
540 		err = dmu_tx_check_ioerr(NULL, dn, 0, 0);
541 		if (err) {
542 			tx->tx_err = err;
543 			return;
544 		}
545 
546 		if (dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
547 		    dn->dn_phys->dn_blkptr[0].blk_birth))
548 			tx->tx_space_tooverwrite += dn->dn_datablksz;
549 		else
550 			tx->tx_space_towrite += dn->dn_datablksz;
551 		return;
552 	}
553 
554 	if (dn->dn_maxblkid > 0 && name) {
555 		/*
556 		 * access the name in this fat-zap so that we'll check
557 		 * for i/o errors to the leaf blocks, etc.
558 		 */
559 		err = zap_lookup(&dn->dn_objset->os, dn->dn_object, name,
560 		    8, 0, NULL);
561 		if (err == EIO) {
562 			tx->tx_err = err;
563 			return;
564 		}
565 	}
566 
567 	/*
568 	 * 3 blocks overwritten: target leaf, ptrtbl block, header block
569 	 * 3 new blocks written if adding: new split leaf, 2 grown ptrtbl blocks
570 	 */
571 	dmu_tx_count_write(tx, dn, dn->dn_maxblkid * dn->dn_datablksz,
572 	    (3 + add ? 3 : 0) << dn->dn_datablkshift);
573 
574 	/*
575 	 * If the modified blocks are scattered to the four winds,
576 	 * we'll have to modify an indirect twig for each.
577 	 */
578 	epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
579 	for (nblocks = dn->dn_maxblkid >> epbs; nblocks != 0; nblocks >>= epbs)
580 		tx->tx_space_towrite += 3 << dn->dn_indblkshift;
581 }
582 
583 void
584 dmu_tx_hold_zap(dmu_tx_t *tx, uint64_t object, int add, char *name)
585 {
586 	ASSERT(tx->tx_txg == 0);
587 
588 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_ZAP,
589 	    dmu_tx_hold_zap_impl, add, (uintptr_t)name);
590 }
591 
592 void
593 dmu_tx_hold_bonus(dmu_tx_t *tx, uint64_t object)
594 {
595 	ASSERT(tx->tx_txg == 0);
596 
597 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_BONUS,
598 	    dmu_tx_hold_write_impl, 0, 0);
599 }
600 
601 
602 /* ARGSUSED */
603 static void
604 dmu_tx_hold_space_impl(dmu_tx_t *tx, dnode_t *dn,
605     uint64_t space, uint64_t unused)
606 {
607 	tx->tx_space_towrite += space;
608 }
609 
610 void
611 dmu_tx_hold_space(dmu_tx_t *tx, uint64_t space)
612 {
613 	ASSERT(tx->tx_txg == 0);
614 
615 	dmu_tx_hold_object_impl(tx, tx->tx_objset, DMU_NEW_OBJECT, THT_SPACE,
616 	    dmu_tx_hold_space_impl, space, 0);
617 }
618 
619 int
620 dmu_tx_holds(dmu_tx_t *tx, uint64_t object)
621 {
622 	dmu_tx_hold_t *dth;
623 	int holds = 0;
624 
625 	/*
626 	 * By asserting that the tx is assigned, we're counting the
627 	 * number of dn_tx_holds, which is the same as the number of
628 	 * dn_holds.  Otherwise, we'd be counting dn_holds, but
629 	 * dn_tx_holds could be 0.
630 	 */
631 	ASSERT(tx->tx_txg != 0);
632 
633 	/* if (tx->tx_anyobj == TRUE) */
634 		/* return (0); */
635 
636 	for (dth = list_head(&tx->tx_holds); dth;
637 	    dth = list_next(&tx->tx_holds, dth)) {
638 		if (dth->dth_dnode && dth->dth_dnode->dn_object == object)
639 			holds++;
640 	}
641 
642 	return (holds);
643 }
644 
645 #ifdef ZFS_DEBUG
646 void
647 dmu_tx_dirty_buf(dmu_tx_t *tx, dmu_buf_impl_t *db)
648 {
649 	dmu_tx_hold_t *dth;
650 	int match_object = FALSE, match_offset = FALSE;
651 	dnode_t *dn = db->db_dnode;
652 
653 	ASSERT(tx->tx_txg != 0);
654 	ASSERT(tx->tx_objset == NULL || dn->dn_objset == tx->tx_objset->os);
655 	ASSERT3U(dn->dn_object, ==, db->db.db_object);
656 
657 	if (tx->tx_anyobj)
658 		return;
659 
660 	/* XXX No checking on the meta dnode for now */
661 	if (db->db.db_object == DMU_META_DNODE_OBJECT)
662 		return;
663 
664 	for (dth = list_head(&tx->tx_holds); dth;
665 	    dth = list_next(&tx->tx_holds, dth)) {
666 		ASSERT(dn == NULL || dn->dn_assigned_txg == tx->tx_txg);
667 		if (dth->dth_dnode == dn && dth->dth_type != THT_NEWOBJECT)
668 			match_object = TRUE;
669 		if (dth->dth_dnode == NULL || dth->dth_dnode == dn) {
670 			int datablkshift = dn->dn_datablkshift ?
671 			    dn->dn_datablkshift : SPA_MAXBLOCKSHIFT;
672 			int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
673 			int shift = datablkshift + epbs * db->db_level;
674 			uint64_t beginblk = shift >= 64 ? 0 :
675 			    (dth->dth_arg1 >> shift);
676 			uint64_t endblk = shift >= 64 ? 0 :
677 			    ((dth->dth_arg1 + dth->dth_arg2 - 1) >> shift);
678 			uint64_t blkid = db->db_blkid;
679 
680 			/* XXX dth_arg2 better not be zero... */
681 
682 			dprintf("found dth type %x beginblk=%llx endblk=%llx\n",
683 			    dth->dth_type, beginblk, endblk);
684 
685 			switch (dth->dth_type) {
686 			case THT_WRITE:
687 				if (blkid >= beginblk && blkid <= endblk)
688 					match_offset = TRUE;
689 				/*
690 				 * We will let this hold work for the bonus
691 				 * buffer so that we don't need to hold it
692 				 * when creating a new object.
693 				 */
694 				if (blkid == DB_BONUS_BLKID)
695 					match_offset = TRUE;
696 				/*
697 				 * They might have to increase nlevels,
698 				 * thus dirtying the new TLIBs.  Or the
699 				 * might have to change the block size,
700 				 * thus dirying the new lvl=0 blk=0.
701 				 */
702 				if (blkid == 0)
703 					match_offset = TRUE;
704 				break;
705 			case THT_FREE:
706 				if (blkid == beginblk &&
707 				    (dth->dth_arg1 != 0 ||
708 				    dn->dn_maxblkid == 0))
709 					match_offset = TRUE;
710 				if (blkid == endblk &&
711 				    dth->dth_arg2 != DMU_OBJECT_END)
712 					match_offset = TRUE;
713 				break;
714 			case THT_BONUS:
715 				if (blkid == DB_BONUS_BLKID)
716 					match_offset = TRUE;
717 				break;
718 			case THT_ZAP:
719 				match_offset = TRUE;
720 				break;
721 			case THT_NEWOBJECT:
722 				match_object = TRUE;
723 				break;
724 			default:
725 				ASSERT(!"bad dth_type");
726 			}
727 		}
728 		if (match_object && match_offset)
729 			return;
730 	}
731 	panic("dirtying dbuf obj=%llx lvl=%u blkid=%llx but not tx_held\n",
732 	    (u_longlong_t)db->db.db_object, db->db_level,
733 	    (u_longlong_t)db->db_blkid);
734 }
735 #endif
736 
737 static int
738 dmu_tx_try_assign(dmu_tx_t *tx, uint64_t txg_how, dmu_tx_hold_t **last_dth)
739 {
740 	dmu_tx_hold_t *dth;
741 	uint64_t lsize, asize, fsize, towrite;
742 
743 	*last_dth = NULL;
744 
745 	tx->tx_txg = txg_hold_open(tx->tx_pool, &tx->tx_txgh);
746 
747 	if (txg_how >= TXG_INITIAL && txg_how != tx->tx_txg)
748 		return (ERESTART);
749 	if (tx->tx_err)
750 		return (tx->tx_err);
751 
752 	for (dth = list_head(&tx->tx_holds); dth;
753 	    dth = list_next(&tx->tx_holds, dth)) {
754 		dnode_t *dn = dth->dth_dnode;
755 		if (dn != NULL) {
756 			mutex_enter(&dn->dn_mtx);
757 			while (dn->dn_assigned_txg == tx->tx_txg - 1) {
758 				if (txg_how != TXG_WAIT) {
759 					mutex_exit(&dn->dn_mtx);
760 					return (ERESTART);
761 				}
762 				cv_wait(&dn->dn_notxholds, &dn->dn_mtx);
763 			}
764 			if (dn->dn_assigned_txg == 0) {
765 				ASSERT(dn->dn_assigned_tx == NULL);
766 				dn->dn_assigned_txg = tx->tx_txg;
767 				dn->dn_assigned_tx = tx;
768 			} else {
769 				ASSERT(dn->dn_assigned_txg == tx->tx_txg);
770 				if (dn->dn_assigned_tx != tx)
771 					dn->dn_assigned_tx = NULL;
772 			}
773 			(void) refcount_add(&dn->dn_tx_holds, tx);
774 			mutex_exit(&dn->dn_mtx);
775 		}
776 		*last_dth = dth;
777 		if (tx->tx_err)
778 			return (tx->tx_err);
779 	}
780 
781 	/*
782 	 * If a snapshot has been taken since we made our estimates,
783 	 * assume that we won't be able to free or overwrite anything.
784 	 */
785 	if (tx->tx_objset &&
786 	    dsl_dataset_prev_snap_txg(tx->tx_objset->os->os_dsl_dataset) >
787 	    tx->tx_lastsnap_txg) {
788 		tx->tx_space_towrite += tx->tx_space_tooverwrite;
789 		tx->tx_space_tooverwrite = 0;
790 		tx->tx_space_tofree = 0;
791 	}
792 
793 	/*
794 	 * Convert logical size to worst-case allocated size.
795 	 */
796 	fsize = spa_get_asize(tx->tx_pool->dp_spa, tx->tx_space_tooverwrite) +
797 	    tx->tx_space_tofree;
798 	lsize = tx->tx_space_towrite + tx->tx_space_tooverwrite;
799 	asize = spa_get_asize(tx->tx_pool->dp_spa, lsize);
800 	towrite = tx->tx_space_towrite;
801 	tx->tx_space_towrite = asize;
802 
803 	if (tx->tx_dir && asize != 0) {
804 		int err = dsl_dir_tempreserve_space(tx->tx_dir,
805 		    lsize, asize, fsize, &tx->tx_tempreserve_cookie, tx);
806 		if (err) {
807 			tx->tx_space_towrite = towrite;
808 			return (err);
809 		}
810 	}
811 
812 	return (0);
813 }
814 
815 static uint64_t
816 dmu_tx_unassign(dmu_tx_t *tx, dmu_tx_hold_t *last_dth)
817 {
818 	uint64_t txg = tx->tx_txg;
819 	dmu_tx_hold_t *dth;
820 
821 	ASSERT(txg != 0);
822 
823 	txg_rele_to_quiesce(&tx->tx_txgh);
824 
825 	for (dth = last_dth; dth; dth = list_prev(&tx->tx_holds, dth)) {
826 		dnode_t *dn = dth->dth_dnode;
827 
828 		if (dn == NULL)
829 			continue;
830 		mutex_enter(&dn->dn_mtx);
831 		ASSERT3U(dn->dn_assigned_txg, ==, txg);
832 
833 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
834 			dn->dn_assigned_txg = 0;
835 			dn->dn_assigned_tx = NULL;
836 			cv_broadcast(&dn->dn_notxholds);
837 		}
838 		mutex_exit(&dn->dn_mtx);
839 	}
840 
841 	txg_rele_to_sync(&tx->tx_txgh);
842 
843 	tx->tx_txg = 0;
844 	return (txg);
845 }
846 
847 /*
848  * Assign tx to a transaction group.  txg_how can be one of:
849  *
850  * (1)	TXG_WAIT.  If the current open txg is full, waits until there's
851  *	a new one.  This should be used when you're not holding locks.
852  *	If will only fail if we're truly out of space (or over quota).
853  *
854  * (2)	TXG_NOWAIT.  If we can't assign into the current open txg without
855  *	blocking, returns immediately with ERESTART.  This should be used
856  *	whenever you're holding locks.  On an ERESTART error, the caller
857  *	should drop locks, do a txg_wait_open(dp, 0), and try again.
858  *
859  * (3)	A specific txg.  Use this if you need to ensure that multiple
860  *	transactions all sync in the same txg.  Like TXG_NOWAIT, it
861  *	returns ERESTART if it can't assign you into the requested txg.
862  */
863 int
864 dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how)
865 {
866 	dmu_tx_hold_t *last_dth;
867 	int err;
868 
869 	ASSERT(tx->tx_txg == 0);
870 	ASSERT(txg_how != 0);
871 	ASSERT(!dsl_pool_sync_context(tx->tx_pool));
872 
873 	while ((err = dmu_tx_try_assign(tx, txg_how, &last_dth)) != 0) {
874 		uint64_t txg = dmu_tx_unassign(tx, last_dth);
875 
876 		if (err != ERESTART || txg_how != TXG_WAIT)
877 			return (err);
878 
879 		txg_wait_open(tx->tx_pool, txg + 1);
880 	}
881 
882 	txg_rele_to_quiesce(&tx->tx_txgh);
883 
884 	return (0);
885 }
886 
887 void
888 dmu_tx_willuse_space(dmu_tx_t *tx, int64_t delta)
889 {
890 	if (tx->tx_dir == NULL || delta == 0)
891 		return;
892 
893 	if (delta > 0) {
894 		ASSERT3U(refcount_count(&tx->tx_space_written) + delta, <=,
895 		    tx->tx_space_towrite);
896 		(void) refcount_add_many(&tx->tx_space_written, delta, NULL);
897 	} else {
898 		(void) refcount_add_many(&tx->tx_space_freed, -delta, NULL);
899 	}
900 }
901 
902 void
903 dmu_tx_commit(dmu_tx_t *tx)
904 {
905 	dmu_tx_hold_t *dth;
906 
907 	ASSERT(tx->tx_txg != 0);
908 
909 	while (dth = list_head(&tx->tx_holds)) {
910 		dnode_t *dn = dth->dth_dnode;
911 
912 		list_remove(&tx->tx_holds, dth);
913 		kmem_free(dth, sizeof (dmu_tx_hold_t));
914 		if (dn == NULL)
915 			continue;
916 		mutex_enter(&dn->dn_mtx);
917 		ASSERT3U(dn->dn_assigned_txg, ==, tx->tx_txg);
918 
919 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
920 			dn->dn_assigned_txg = 0;
921 			dn->dn_assigned_tx = NULL;
922 			cv_broadcast(&dn->dn_notxholds);
923 		}
924 		mutex_exit(&dn->dn_mtx);
925 		dnode_rele(dn, tx);
926 	}
927 
928 	if (tx->tx_dir && tx->tx_space_towrite > 0) {
929 		dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
930 	}
931 
932 	if (tx->tx_anyobj == FALSE)
933 		txg_rele_to_sync(&tx->tx_txgh);
934 	dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
935 	    tx->tx_space_towrite, refcount_count(&tx->tx_space_written),
936 	    tx->tx_space_tofree, refcount_count(&tx->tx_space_freed));
937 	refcount_destroy_many(&tx->tx_space_written,
938 	    refcount_count(&tx->tx_space_written));
939 	refcount_destroy_many(&tx->tx_space_freed,
940 	    refcount_count(&tx->tx_space_freed));
941 #ifdef ZFS_DEBUG
942 	if (tx->tx_debug_buf)
943 		kmem_free(tx->tx_debug_buf, 4096);
944 #endif
945 	kmem_free(tx, sizeof (dmu_tx_t));
946 }
947 
948 void
949 dmu_tx_abort(dmu_tx_t *tx)
950 {
951 	dmu_tx_hold_t *dth;
952 
953 	ASSERT(tx->tx_txg == 0);
954 
955 	while (dth = list_head(&tx->tx_holds)) {
956 		dnode_t *dn = dth->dth_dnode;
957 
958 		list_remove(&tx->tx_holds, dth);
959 		kmem_free(dth, sizeof (dmu_tx_hold_t));
960 		if (dn != NULL)
961 			dnode_rele(dn, tx);
962 	}
963 	refcount_destroy_many(&tx->tx_space_written,
964 	    refcount_count(&tx->tx_space_written));
965 	refcount_destroy_many(&tx->tx_space_freed,
966 	    refcount_count(&tx->tx_space_freed));
967 #ifdef ZFS_DEBUG
968 	if (tx->tx_debug_buf)
969 		kmem_free(tx->tx_debug_buf, 4096);
970 #endif
971 	kmem_free(tx, sizeof (dmu_tx_t));
972 }
973 
974 uint64_t
975 dmu_tx_get_txg(dmu_tx_t *tx)
976 {
977 	ASSERT(tx->tx_txg != 0);
978 	return (tx->tx_txg);
979 }
980