xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu_tx.c (revision f65e61c04bc28ffd6bda04619c84330b420450b5)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dbuf.h>
31 #include <sys/dmu_tx.h>
32 #include <sys/dmu_objset.h>
33 #include <sys/dsl_dataset.h> /* for dsl_dataset_block_freeable() */
34 #include <sys/dsl_dir.h> /* for dsl_dir_tempreserve_*() */
35 #include <sys/dsl_pool.h>
36 #include <sys/zap_impl.h>	/* for ZAP_BLOCK_SHIFT */
37 #include <sys/spa.h>
38 #include <sys/zfs_context.h>
39 
40 #ifdef ZFS_DEBUG
41 int dmu_use_tx_debug_bufs = 1;
42 #endif
43 
44 dmu_tx_t *
45 dmu_tx_create_ds(dsl_dir_t *dd)
46 {
47 	dmu_tx_t *tx = kmem_zalloc(sizeof (dmu_tx_t), KM_SLEEP);
48 	tx->tx_dir = dd;
49 	if (dd)
50 		tx->tx_pool = dd->dd_pool;
51 	list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
52 	    offsetof(dmu_tx_hold_t, dth_node));
53 	refcount_create(&tx->tx_space_written);
54 	refcount_create(&tx->tx_space_freed);
55 	return (tx);
56 }
57 
58 dmu_tx_t *
59 dmu_tx_create(objset_t *os)
60 {
61 	dmu_tx_t *tx = dmu_tx_create_ds(os->os->os_dsl_dataset->ds_dir);
62 	tx->tx_objset = os;
63 	return (tx);
64 }
65 
66 dmu_tx_t *
67 dmu_tx_create_assigned(struct dsl_pool *dp, uint64_t txg)
68 {
69 	dmu_tx_t *tx = dmu_tx_create_ds(NULL);
70 
71 	ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
72 	tx->tx_pool = dp;
73 	tx->tx_txg = txg;
74 	tx->tx_anyobj = TRUE;
75 
76 	return (tx);
77 }
78 
79 int
80 dmu_tx_is_syncing(dmu_tx_t *tx)
81 {
82 	return (tx->tx_anyobj);
83 }
84 
85 int
86 dmu_tx_private_ok(dmu_tx_t *tx)
87 {
88 	return (tx->tx_anyobj || tx->tx_privateobj);
89 }
90 
91 static void
92 dmu_tx_hold_object_impl(dmu_tx_t *tx, objset_t *os, uint64_t object,
93     enum dmu_tx_hold_type type, dmu_tx_hold_func_t func,
94     uint64_t arg1, uint64_t arg2)
95 {
96 	dmu_tx_hold_t *dth;
97 	dnode_t *dn = NULL;
98 
99 	if (object != DMU_NEW_OBJECT) {
100 		dn = dnode_hold(os->os, object, tx);
101 
102 		if (tx->tx_txg != 0) {
103 			mutex_enter(&dn->dn_mtx);
104 			/*
105 			 * dn->dn_assigned_txg == tx->tx_txg doesn't pose a
106 			 * problem, but there's no way for it to happen (for
107 			 * now, at least).
108 			 */
109 			ASSERT(dn->dn_assigned_txg == 0);
110 			ASSERT(dn->dn_assigned_tx == NULL);
111 			dn->dn_assigned_txg = tx->tx_txg;
112 			dn->dn_assigned_tx = tx;
113 			(void) refcount_add(&dn->dn_tx_holds, tx);
114 			mutex_exit(&dn->dn_mtx);
115 		}
116 	}
117 
118 	dth = kmem_zalloc(sizeof (dmu_tx_hold_t), KM_SLEEP);
119 	dth->dth_dnode = dn;
120 	dth->dth_type = type;
121 	dth->dth_func = func;
122 	dth->dth_arg1 = arg1;
123 	dth->dth_arg2 = arg2;
124 	/*
125 	 * XXX Investigate using a different data structure to keep
126 	 * track of dnodes in a tx.  Maybe array, since there will
127 	 * generally not be many entries?
128 	 */
129 	list_insert_tail(&tx->tx_holds, dth);
130 }
131 
132 void
133 dmu_tx_add_new_object(dmu_tx_t *tx, objset_t *os, uint64_t object)
134 {
135 	/*
136 	 * If we're syncing, they can manipulate any object anyhow, and
137 	 * the hold on the dnode_t can cause problems.
138 	 */
139 	if (!dmu_tx_is_syncing(tx)) {
140 		dmu_tx_hold_object_impl(tx, os, object, THT_NEWOBJECT,
141 		    NULL, 0, 0);
142 	}
143 }
144 
145 /* ARGSUSED */
146 static void
147 dmu_tx_count_write(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
148 {
149 	uint64_t start, end, space;
150 	int min_bs, max_bs, min_ibs, max_ibs, epbs, bits;
151 
152 	if (len == 0)
153 		return;
154 
155 	min_bs = SPA_MINBLOCKSHIFT;
156 	max_bs = SPA_MAXBLOCKSHIFT;
157 	min_ibs = DN_MIN_INDBLKSHIFT;
158 	max_ibs = DN_MAX_INDBLKSHIFT;
159 
160 	/*
161 	 * If there's more than one block, the blocksize can't change,
162 	 * so we can make a more precise estimate.  Alternatively,
163 	 * if the dnode's ibs is larger than max_ibs, always use that.
164 	 * This ensures that if we reduce DN_MAX_INDBLKSHIFT,
165 	 * the code will still work correctly on existing pools.
166 	 */
167 	if (dn && (dn->dn_maxblkid != 0 || dn->dn_indblkshift > max_ibs)) {
168 		min_ibs = max_ibs = dn->dn_indblkshift;
169 		if (dn->dn_datablkshift != 0)
170 			min_bs = max_bs = dn->dn_datablkshift;
171 	}
172 
173 	/*
174 	 * 'end' is the last thing we will access, not one past.
175 	 * This way we won't overflow when accessing the last byte.
176 	 */
177 	start = P2ALIGN(off, 1ULL << max_bs);
178 	end = P2ROUNDUP(off + len, 1ULL << max_bs) - 1;
179 	space = end - start + 1;
180 
181 	start >>= min_bs;
182 	end >>= min_bs;
183 
184 	epbs = min_ibs - SPA_BLKPTRSHIFT;
185 
186 	/*
187 	 * The object contains at most 2^(64 - min_bs) blocks,
188 	 * and each indirect level maps 2^epbs.
189 	 */
190 	for (bits = 64 - min_bs; bits >= 0; bits -= epbs) {
191 		start >>= epbs;
192 		end >>= epbs;
193 		/*
194 		 * If we increase the number of levels of indirection,
195 		 * we'll need new blkid=0 indirect blocks.  If start == 0,
196 		 * we're already accounting for that blocks; and if end == 0,
197 		 * we can't increase the number of levels beyond that.
198 		 */
199 		if (start != 0 && end != 0)
200 			space += 1ULL << max_ibs;
201 		space += (end - start + 1) << max_ibs;
202 	}
203 
204 	ASSERT(space < 2 * DMU_MAX_ACCESS);
205 
206 	tx->tx_space_towrite += space;
207 }
208 
209 static void
210 dmu_tx_count_dnode(dmu_tx_t *tx, dnode_t *dn)
211 {
212 	dnode_t *mdn = tx->tx_objset->os->os_meta_dnode;
213 	uint64_t object = dn ? dn->dn_object : DN_MAX_OBJECT - 1;
214 	uint64_t pre_write_space;
215 
216 	ASSERT(object < DN_MAX_OBJECT);
217 	pre_write_space = tx->tx_space_towrite;
218 	dmu_tx_count_write(tx, mdn, object << DNODE_SHIFT, 1 << DNODE_SHIFT);
219 	if (dn && dn->dn_dbuf->db_blkptr &&
220 	    dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
221 	    dn->dn_dbuf->db_blkptr->blk_birth, tx)) {
222 		tx->tx_space_tooverwrite +=
223 			tx->tx_space_towrite - pre_write_space;
224 		tx->tx_space_towrite = pre_write_space;
225 	}
226 }
227 
228 /* ARGSUSED */
229 static void
230 dmu_tx_hold_write_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
231 {
232 	dmu_tx_count_write(tx, dn, off, len);
233 	dmu_tx_count_dnode(tx, dn);
234 }
235 
236 void
237 dmu_tx_hold_write(dmu_tx_t *tx, uint64_t object, uint64_t off, int len)
238 {
239 	ASSERT(tx->tx_txg == 0);
240 	ASSERT(len > 0 && len < DMU_MAX_ACCESS);
241 	ASSERT(UINT64_MAX - off >= len - 1);
242 
243 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_WRITE,
244 	    dmu_tx_hold_write_impl, off, len);
245 }
246 
247 static void
248 dmu_tx_count_free(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
249 {
250 	uint64_t blkid, nblks;
251 	uint64_t space = 0;
252 	dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
253 
254 	ASSERT(dn->dn_assigned_tx == tx || dn->dn_assigned_tx == NULL);
255 
256 	if (dn->dn_datablkshift == 0)
257 		return;
258 	/*
259 	 * not that the dnode can change, since it isn't dirty, but
260 	 * dbuf_hold_impl() wants us to have the struct_rwlock.
261 	 * also need it to protect dn_maxblkid.
262 	 */
263 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
264 	blkid = off >> dn->dn_datablkshift;
265 	nblks = (off + len) >> dn->dn_datablkshift;
266 
267 	if (blkid >= dn->dn_maxblkid)
268 		goto out;
269 	if (blkid + nblks > dn->dn_maxblkid)
270 		nblks = dn->dn_maxblkid - blkid;
271 
272 	/* don't bother after the 100,000 blocks */
273 	nblks = MIN(nblks, 128*1024);
274 
275 	if (dn->dn_phys->dn_nlevels == 1) {
276 		int i;
277 		for (i = 0; i < nblks; i++) {
278 			blkptr_t *bp = dn->dn_phys->dn_blkptr;
279 			ASSERT3U(blkid + i, <, dn->dn_phys->dn_nblkptr);
280 			bp += blkid + i;
281 			if (dsl_dataset_block_freeable(ds, bp->blk_birth, tx)) {
282 				dprintf_bp(bp, "can free old%s", "");
283 				space += BP_GET_ASIZE(bp);
284 			}
285 		}
286 		goto out;
287 	}
288 
289 	while (nblks) {
290 		dmu_buf_impl_t *dbuf;
291 		int err, epbs, blkoff, tochk;
292 
293 		epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
294 		blkoff = P2PHASE(blkid, 1<<epbs);
295 		tochk = MIN((1<<epbs) - blkoff, nblks);
296 
297 		err = dbuf_hold_impl(dn, 1, blkid >> epbs, TRUE, FTAG, &dbuf);
298 		if (err == 0) {
299 			int i;
300 			blkptr_t *bp;
301 
302 			dbuf_read_havestruct(dbuf);
303 
304 			bp = dbuf->db.db_data;
305 			bp += blkoff;
306 
307 			for (i = 0; i < tochk; i++) {
308 				if (dsl_dataset_block_freeable(ds,
309 				    bp[i].blk_birth, tx)) {
310 					dprintf_bp(&bp[i],
311 					    "can free old%s", "");
312 					space += BP_GET_ASIZE(&bp[i]);
313 				}
314 			}
315 			dbuf_remove_ref(dbuf, FTAG);
316 		} else {
317 			/* the indirect block is sparse */
318 			ASSERT(err == ENOENT);
319 		}
320 
321 		blkid += tochk;
322 		nblks -= tochk;
323 	}
324 out:
325 	rw_exit(&dn->dn_struct_rwlock);
326 
327 	tx->tx_space_tofree += space;
328 }
329 
330 static void
331 dmu_tx_hold_free_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
332 {
333 	int dirty;
334 
335 	/* first block */
336 	if (off != 0 /* || dn->dn_maxblkid == 0 */)
337 		dmu_tx_count_write(tx, dn, off, 1);
338 	/* last block */
339 	if (len != DMU_OBJECT_END)
340 		dmu_tx_count_write(tx, dn, off+len, 1);
341 
342 	dmu_tx_count_dnode(tx, dn);
343 
344 	if (off >= (dn->dn_maxblkid+1) * dn->dn_datablksz)
345 		return;
346 	if (len == DMU_OBJECT_END)
347 		len = (dn->dn_maxblkid+1) * dn->dn_datablksz - off;
348 
349 	/* XXX locking */
350 	dirty = dn->dn_dirtyblksz[0] | dn->dn_dirtyblksz[1] |
351 	    dn->dn_dirtyblksz[2] | dn->dn_dirtyblksz[3];
352 	if (dn->dn_assigned_tx != NULL && !dirty)
353 		dmu_tx_count_free(tx, dn, off, len);
354 }
355 
356 void
357 dmu_tx_hold_free(dmu_tx_t *tx, uint64_t object, uint64_t off, uint64_t len)
358 {
359 	ASSERT(tx->tx_txg == 0);
360 
361 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_FREE,
362 	    dmu_tx_hold_free_impl, off, len);
363 }
364 
365 /* ARGSUSED */
366 static void
367 dmu_tx_hold_zap_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t nops, uint64_t cops)
368 {
369 	uint64_t nblocks;
370 	int epbs;
371 
372 	dmu_tx_count_dnode(tx, dn);
373 
374 	if (dn == NULL) {
375 		/*
376 		 * Assuming that nops+cops is not super huge, we will be
377 		 * able to fit a new object's entries into one leaf
378 		 * block.  So there will be at most 2 blocks total,
379 		 * including the header block.
380 		 */
381 		dmu_tx_count_write(tx, dn, 0, 2 << fzap_default_block_shift);
382 		return;
383 	}
384 
385 	ASSERT3P(dmu_ot[dn->dn_type].ot_byteswap, ==, zap_byteswap);
386 
387 	if (dn->dn_maxblkid == 0 && nops == 0) {
388 		/*
389 		 * If there is only one block  (i.e. this is a micro-zap)
390 		 * and we are only doing updates, the accounting is simple.
391 		 */
392 		if (dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
393 		    dn->dn_phys->dn_blkptr[0].blk_birth, tx))
394 			tx->tx_space_tooverwrite += dn->dn_datablksz;
395 		else
396 			tx->tx_space_towrite += dn->dn_datablksz;
397 		return;
398 	}
399 
400 	/*
401 	 * 3 blocks overwritten per op: target leaf, ptrtbl block, header block
402 	 * 3 new blocks written per op: new split leaf, 2 grown ptrtbl blocks
403 	 */
404 	dmu_tx_count_write(tx, dn, dn->dn_maxblkid * dn->dn_datablksz,
405 	    (nops * 6ULL + cops * 3ULL) << dn->dn_datablkshift);
406 
407 	/*
408 	 * If the modified blocks are scattered to the four winds,
409 	 * we'll have to modify an indirect twig for each.
410 	 */
411 	epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
412 	for (nblocks = dn->dn_maxblkid >> epbs; nblocks != 0; nblocks >>= epbs)
413 		tx->tx_space_towrite +=
414 		    ((nops + cops) * 3ULL) << dn->dn_indblkshift;
415 }
416 
417 void
418 dmu_tx_hold_zap(dmu_tx_t *tx, uint64_t object, int ops)
419 {
420 	ASSERT(tx->tx_txg == 0);
421 
422 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_ZAP,
423 	    dmu_tx_hold_zap_impl, (ops > 0?ops:0), (ops < 0?-ops:0));
424 }
425 
426 void
427 dmu_tx_hold_bonus(dmu_tx_t *tx, uint64_t object)
428 {
429 	ASSERT(tx->tx_txg == 0);
430 
431 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_BONUS,
432 	    dmu_tx_hold_write_impl, 0, 0);
433 }
434 
435 
436 /* ARGSUSED */
437 static void
438 dmu_tx_hold_space_impl(dmu_tx_t *tx, dnode_t *dn,
439     uint64_t space, uint64_t unused)
440 {
441 	tx->tx_space_towrite += space;
442 }
443 
444 void
445 dmu_tx_hold_space(dmu_tx_t *tx, uint64_t space)
446 {
447 	ASSERT(tx->tx_txg == 0);
448 
449 	dmu_tx_hold_object_impl(tx, tx->tx_objset, DMU_NEW_OBJECT, THT_SPACE,
450 	    dmu_tx_hold_space_impl, space, 0);
451 }
452 
453 int
454 dmu_tx_holds(dmu_tx_t *tx, uint64_t object)
455 {
456 	dmu_tx_hold_t *dth;
457 	int holds = 0;
458 
459 	/*
460 	 * By asserting that the tx is assigned, we're counting the
461 	 * number of dn_tx_holds, which is the same as the number of
462 	 * dn_holds.  Otherwise, we'd be counting dn_holds, but
463 	 * dn_tx_holds could be 0.
464 	 */
465 	ASSERT(tx->tx_txg != 0);
466 
467 	/* if (tx->tx_anyobj == TRUE) */
468 		/* return (0); */
469 
470 	for (dth = list_head(&tx->tx_holds); dth;
471 	    dth = list_next(&tx->tx_holds, dth)) {
472 		if (dth->dth_dnode && dth->dth_dnode->dn_object == object)
473 			holds++;
474 	}
475 
476 	return (holds);
477 }
478 
479 #ifdef ZFS_DEBUG
480 void
481 dmu_tx_dirty_buf(dmu_tx_t *tx, dmu_buf_impl_t *db)
482 {
483 	dmu_tx_hold_t *dth;
484 	int match_object = FALSE, match_offset = FALSE;
485 	dnode_t *dn = db->db_dnode;
486 
487 	ASSERT(tx->tx_txg != 0);
488 	ASSERT(tx->tx_objset == NULL || dn->dn_objset == tx->tx_objset->os);
489 	ASSERT3U(dn->dn_object, ==, db->db.db_object);
490 
491 	if (tx->tx_anyobj)
492 		return;
493 
494 	/* XXX No checking on the meta dnode for now */
495 	if (db->db.db_object & DMU_PRIVATE_OBJECT)
496 		return;
497 
498 	for (dth = list_head(&tx->tx_holds); dth;
499 	    dth = list_next(&tx->tx_holds, dth)) {
500 		ASSERT(dn == NULL || dn->dn_assigned_txg == tx->tx_txg);
501 		if (dth->dth_dnode == dn && dth->dth_type != THT_NEWOBJECT)
502 			match_object = TRUE;
503 		if (dth->dth_dnode == NULL || dth->dth_dnode == dn) {
504 			int datablkshift = dn->dn_datablkshift ?
505 			    dn->dn_datablkshift : SPA_MAXBLOCKSHIFT;
506 			int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
507 			int shift = datablkshift + epbs * db->db_level;
508 			uint64_t beginblk = shift >= 64 ? 0 :
509 			    (dth->dth_arg1 >> shift);
510 			uint64_t endblk = shift >= 64 ? 0 :
511 			    ((dth->dth_arg1 + dth->dth_arg2 - 1) >> shift);
512 			uint64_t blkid = db->db_blkid;
513 
514 			/* XXX dth_arg2 better not be zero... */
515 
516 			dprintf("found dth type %x beginblk=%llx endblk=%llx\n",
517 			    dth->dth_type, beginblk, endblk);
518 
519 			switch (dth->dth_type) {
520 			case THT_WRITE:
521 				if (blkid >= beginblk && blkid <= endblk)
522 					match_offset = TRUE;
523 				/*
524 				 * We will let this hold work for the bonus
525 				 * buffer so that we don't need to hold it
526 				 * when creating a new object.
527 				 */
528 				if (blkid == DB_BONUS_BLKID)
529 					match_offset = TRUE;
530 				/*
531 				 * They might have to increase nlevels,
532 				 * thus dirtying the new TLIBs.  Or the
533 				 * might have to change the block size,
534 				 * thus dirying the new lvl=0 blk=0.
535 				 */
536 				if (blkid == 0)
537 					match_offset = TRUE;
538 				break;
539 			case THT_FREE:
540 				if (blkid == beginblk &&
541 				    (dth->dth_arg1 != 0 ||
542 				    dn->dn_maxblkid == 0))
543 					match_offset = TRUE;
544 				if (blkid == endblk &&
545 				    dth->dth_arg2 != DMU_OBJECT_END)
546 					match_offset = TRUE;
547 				break;
548 			case THT_BONUS:
549 				if (blkid == DB_BONUS_BLKID)
550 					match_offset = TRUE;
551 				break;
552 			case THT_ZAP:
553 				match_offset = TRUE;
554 				break;
555 			case THT_NEWOBJECT:
556 				match_object = TRUE;
557 				break;
558 			default:
559 				ASSERT(!"bad dth_type");
560 			}
561 		}
562 		if (match_object && match_offset)
563 			return;
564 	}
565 	panic("dirtying dbuf obj=%llx lvl=%u blkid=%llx but not tx_held\n",
566 	    (u_longlong_t)db->db.db_object, db->db_level,
567 	    (u_longlong_t)db->db_blkid);
568 }
569 #endif
570 
571 static int
572 dmu_tx_try_assign(dmu_tx_t *tx, uint64_t txg_how, dmu_tx_hold_t **last_dth)
573 {
574 	dmu_tx_hold_t *dth;
575 	uint64_t lsize, asize, fsize;
576 
577 	*last_dth = NULL;
578 
579 	tx->tx_space_towrite = 0;
580 	tx->tx_space_tofree = 0;
581 	tx->tx_space_tooverwrite = 0;
582 	tx->tx_txg = txg_hold_open(tx->tx_pool, &tx->tx_txgh);
583 
584 	if (txg_how >= TXG_INITIAL && txg_how != tx->tx_txg)
585 		return (ERESTART);
586 
587 	for (dth = list_head(&tx->tx_holds); dth;
588 	    *last_dth = dth, dth = list_next(&tx->tx_holds, dth)) {
589 		dnode_t *dn = dth->dth_dnode;
590 		if (dn != NULL) {
591 			mutex_enter(&dn->dn_mtx);
592 			while (dn->dn_assigned_txg == tx->tx_txg - 1) {
593 				if (txg_how != TXG_WAIT) {
594 					mutex_exit(&dn->dn_mtx);
595 					return (ERESTART);
596 				}
597 				cv_wait(&dn->dn_notxholds, &dn->dn_mtx);
598 			}
599 			if (dn->dn_assigned_txg == 0) {
600 				ASSERT(dn->dn_assigned_tx == NULL);
601 				dn->dn_assigned_txg = tx->tx_txg;
602 				dn->dn_assigned_tx = tx;
603 			} else {
604 				ASSERT(dn->dn_assigned_txg == tx->tx_txg);
605 				if (dn->dn_assigned_tx != tx)
606 					dn->dn_assigned_tx = NULL;
607 			}
608 			(void) refcount_add(&dn->dn_tx_holds, tx);
609 			mutex_exit(&dn->dn_mtx);
610 		}
611 		if (dth->dth_func)
612 			dth->dth_func(tx, dn, dth->dth_arg1, dth->dth_arg2);
613 	}
614 
615 	/*
616 	 * Convert logical size to worst-case allocated size.
617 	 */
618 	fsize = spa_get_asize(tx->tx_pool->dp_spa, tx->tx_space_tooverwrite) +
619 	    tx->tx_space_tofree;
620 	lsize = tx->tx_space_towrite + tx->tx_space_tooverwrite;
621 	asize = spa_get_asize(tx->tx_pool->dp_spa, lsize);
622 	tx->tx_space_towrite = asize;
623 
624 	if (tx->tx_dir && asize != 0) {
625 		int err = dsl_dir_tempreserve_space(tx->tx_dir,
626 		    lsize, asize, fsize, &tx->tx_tempreserve_cookie, tx);
627 		if (err)
628 			return (err);
629 	}
630 
631 	return (0);
632 }
633 
634 static uint64_t
635 dmu_tx_unassign(dmu_tx_t *tx, dmu_tx_hold_t *last_dth)
636 {
637 	uint64_t txg = tx->tx_txg;
638 	dmu_tx_hold_t *dth;
639 
640 	ASSERT(txg != 0);
641 
642 	txg_rele_to_quiesce(&tx->tx_txgh);
643 
644 	for (dth = last_dth; dth; dth = list_prev(&tx->tx_holds, dth)) {
645 		dnode_t *dn = dth->dth_dnode;
646 
647 		if (dn == NULL)
648 			continue;
649 		mutex_enter(&dn->dn_mtx);
650 		ASSERT3U(dn->dn_assigned_txg, ==, txg);
651 
652 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
653 			dn->dn_assigned_txg = 0;
654 			dn->dn_assigned_tx = NULL;
655 			cv_broadcast(&dn->dn_notxholds);
656 		}
657 		mutex_exit(&dn->dn_mtx);
658 	}
659 
660 	txg_rele_to_sync(&tx->tx_txgh);
661 
662 	tx->tx_txg = 0;
663 	return (txg);
664 }
665 
666 /*
667  * Assign tx to a transaction group.  txg_how can be one of:
668  *
669  * (1)	TXG_WAIT.  If the current open txg is full, waits until there's
670  *	a new one.  This should be used when you're not holding locks.
671  *	If will only fail if we're truly out of space (or over quota).
672  *
673  * (2)	TXG_NOWAIT.  If we can't assign into the current open txg without
674  *	blocking, returns immediately with ERESTART.  This should be used
675  *	whenever you're holding locks.  On an ERESTART error, the caller
676  *	should drop locks, do a txg_wait_open(dp, 0), and try again.
677  *
678  * (3)	A specific txg.  Use this if you need to ensure that multiple
679  *	transactions all sync in the same txg.  Like TXG_NOWAIT, it
680  *	returns ERESTART if it can't assign you into the requested txg.
681  */
682 int
683 dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how)
684 {
685 	dmu_tx_hold_t *last_dth;
686 	int err;
687 
688 	ASSERT(tx->tx_txg == 0);
689 	ASSERT(txg_how != 0);
690 	ASSERT(!dsl_pool_sync_context(tx->tx_pool));
691 	ASSERT3U(tx->tx_space_towrite, ==, 0);
692 	ASSERT3U(tx->tx_space_tofree, ==, 0);
693 
694 	while ((err = dmu_tx_try_assign(tx, txg_how, &last_dth)) != 0) {
695 		uint64_t txg = dmu_tx_unassign(tx, last_dth);
696 
697 		if (err != ERESTART || txg_how != TXG_WAIT)
698 			return (err);
699 
700 		txg_wait_open(tx->tx_pool, txg + 1);
701 	}
702 
703 	txg_rele_to_quiesce(&tx->tx_txgh);
704 
705 	return (0);
706 }
707 
708 void
709 dmu_tx_willuse_space(dmu_tx_t *tx, int64_t delta)
710 {
711 	if (tx->tx_dir == NULL || delta == 0)
712 		return;
713 
714 	if (delta > 0) {
715 		ASSERT3U(refcount_count(&tx->tx_space_written) + delta, <=,
716 		    tx->tx_space_towrite);
717 		(void) refcount_add_many(&tx->tx_space_written, delta, NULL);
718 	} else {
719 		(void) refcount_add_many(&tx->tx_space_freed, -delta, NULL);
720 	}
721 }
722 
723 void
724 dmu_tx_commit(dmu_tx_t *tx)
725 {
726 	dmu_tx_hold_t *dth;
727 
728 	ASSERT(tx->tx_txg != 0);
729 
730 	while (dth = list_head(&tx->tx_holds)) {
731 		dnode_t *dn = dth->dth_dnode;
732 
733 		list_remove(&tx->tx_holds, dth);
734 		kmem_free(dth, sizeof (dmu_tx_hold_t));
735 		if (dn == NULL)
736 			continue;
737 		mutex_enter(&dn->dn_mtx);
738 		ASSERT3U(dn->dn_assigned_txg, ==, tx->tx_txg);
739 
740 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
741 			dn->dn_assigned_txg = 0;
742 			dn->dn_assigned_tx = NULL;
743 			cv_broadcast(&dn->dn_notxholds);
744 		}
745 		mutex_exit(&dn->dn_mtx);
746 		dnode_rele(dn, tx);
747 	}
748 
749 	if (tx->tx_dir && tx->tx_space_towrite > 0) {
750 		dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
751 	}
752 
753 	if (tx->tx_anyobj == FALSE)
754 		txg_rele_to_sync(&tx->tx_txgh);
755 	dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
756 	    tx->tx_space_towrite, refcount_count(&tx->tx_space_written),
757 	    tx->tx_space_tofree, refcount_count(&tx->tx_space_freed));
758 	refcount_destroy_many(&tx->tx_space_written,
759 	    refcount_count(&tx->tx_space_written));
760 	refcount_destroy_many(&tx->tx_space_freed,
761 	    refcount_count(&tx->tx_space_freed));
762 #ifdef ZFS_DEBUG
763 	if (tx->tx_debug_buf)
764 		kmem_free(tx->tx_debug_buf, 4096);
765 #endif
766 	kmem_free(tx, sizeof (dmu_tx_t));
767 }
768 
769 void
770 dmu_tx_abort(dmu_tx_t *tx)
771 {
772 	dmu_tx_hold_t *dth;
773 
774 	ASSERT(tx->tx_txg == 0);
775 
776 	while (dth = list_head(&tx->tx_holds)) {
777 		dnode_t *dn = dth->dth_dnode;
778 
779 		list_remove(&tx->tx_holds, dth);
780 		kmem_free(dth, sizeof (dmu_tx_hold_t));
781 		if (dn != NULL)
782 			dnode_rele(dn, tx);
783 	}
784 	refcount_destroy_many(&tx->tx_space_written,
785 	    refcount_count(&tx->tx_space_written));
786 	refcount_destroy_many(&tx->tx_space_freed,
787 	    refcount_count(&tx->tx_space_freed));
788 #ifdef ZFS_DEBUG
789 	if (tx->tx_debug_buf)
790 		kmem_free(tx->tx_debug_buf, 4096);
791 #endif
792 	kmem_free(tx, sizeof (dmu_tx_t));
793 }
794 
795 uint64_t
796 dmu_tx_get_txg(dmu_tx_t *tx)
797 {
798 	ASSERT(tx->tx_txg != 0);
799 	return (tx->tx_txg);
800 }
801