xref: /illumos-gate/usr/src/uts/common/fs/zfs/zil.c (revision 770499e185d15678ccb0be57ebc626ad18d93383)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
24  * Copyright (c) 2014 Integros [integros.com]
25  */
26 
27 /* Portions Copyright 2010 Robert Milkowski */
28 
29 #include <sys/zfs_context.h>
30 #include <sys/spa.h>
31 #include <sys/dmu.h>
32 #include <sys/zap.h>
33 #include <sys/arc.h>
34 #include <sys/stat.h>
35 #include <sys/resource.h>
36 #include <sys/zil.h>
37 #include <sys/zil_impl.h>
38 #include <sys/dsl_dataset.h>
39 #include <sys/vdev_impl.h>
40 #include <sys/dmu_tx.h>
41 #include <sys/dsl_pool.h>
42 #include <sys/abd.h>
43 
44 /*
45  * The zfs intent log (ZIL) saves transaction records of system calls
46  * that change the file system in memory with enough information
47  * to be able to replay them. These are stored in memory until
48  * either the DMU transaction group (txg) commits them to the stable pool
49  * and they can be discarded, or they are flushed to the stable log
50  * (also in the pool) due to a fsync, O_DSYNC or other synchronous
51  * requirement. In the event of a panic or power fail then those log
52  * records (transactions) are replayed.
53  *
54  * There is one ZIL per file system. Its on-disk (pool) format consists
55  * of 3 parts:
56  *
57  * 	- ZIL header
58  * 	- ZIL blocks
59  * 	- ZIL records
60  *
61  * A log record holds a system call transaction. Log blocks can
62  * hold many log records and the blocks are chained together.
63  * Each ZIL block contains a block pointer (blkptr_t) to the next
64  * ZIL block in the chain. The ZIL header points to the first
65  * block in the chain. Note there is not a fixed place in the pool
66  * to hold blocks. They are dynamically allocated and freed as
67  * needed from the blocks available. Figure X shows the ZIL structure:
68  */
69 
70 /*
71  * Disable intent logging replay.  This global ZIL switch affects all pools.
72  */
73 int zil_replay_disable = 0;
74 
75 /*
76  * Tunable parameter for debugging or performance analysis.  Setting
77  * zfs_nocacheflush will cause corruption on power loss if a volatile
78  * out-of-order write cache is enabled.
79  */
80 boolean_t zfs_nocacheflush = B_FALSE;
81 
82 static kmem_cache_t *zil_lwb_cache;
83 
84 static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
85 
86 #define	LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
87     sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
88 
89 static int
90 zil_bp_compare(const void *x1, const void *x2)
91 {
92 	const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
93 	const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
94 
95 	if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
96 		return (-1);
97 	if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
98 		return (1);
99 
100 	if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
101 		return (-1);
102 	if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
103 		return (1);
104 
105 	return (0);
106 }
107 
108 static void
109 zil_bp_tree_init(zilog_t *zilog)
110 {
111 	avl_create(&zilog->zl_bp_tree, zil_bp_compare,
112 	    sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
113 }
114 
115 static void
116 zil_bp_tree_fini(zilog_t *zilog)
117 {
118 	avl_tree_t *t = &zilog->zl_bp_tree;
119 	zil_bp_node_t *zn;
120 	void *cookie = NULL;
121 
122 	while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
123 		kmem_free(zn, sizeof (zil_bp_node_t));
124 
125 	avl_destroy(t);
126 }
127 
128 int
129 zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
130 {
131 	avl_tree_t *t = &zilog->zl_bp_tree;
132 	const dva_t *dva;
133 	zil_bp_node_t *zn;
134 	avl_index_t where;
135 
136 	if (BP_IS_EMBEDDED(bp))
137 		return (0);
138 
139 	dva = BP_IDENTITY(bp);
140 
141 	if (avl_find(t, dva, &where) != NULL)
142 		return (SET_ERROR(EEXIST));
143 
144 	zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
145 	zn->zn_dva = *dva;
146 	avl_insert(t, zn, where);
147 
148 	return (0);
149 }
150 
151 static zil_header_t *
152 zil_header_in_syncing_context(zilog_t *zilog)
153 {
154 	return ((zil_header_t *)zilog->zl_header);
155 }
156 
157 static void
158 zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
159 {
160 	zio_cksum_t *zc = &bp->blk_cksum;
161 
162 	zc->zc_word[ZIL_ZC_GUID_0] = spa_get_random(-1ULL);
163 	zc->zc_word[ZIL_ZC_GUID_1] = spa_get_random(-1ULL);
164 	zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
165 	zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
166 }
167 
168 /*
169  * Read a log block and make sure it's valid.
170  */
171 static int
172 zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
173     char **end)
174 {
175 	enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
176 	arc_flags_t aflags = ARC_FLAG_WAIT;
177 	arc_buf_t *abuf = NULL;
178 	zbookmark_phys_t zb;
179 	int error;
180 
181 	if (zilog->zl_header->zh_claim_txg == 0)
182 		zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
183 
184 	if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
185 		zio_flags |= ZIO_FLAG_SPECULATIVE;
186 
187 	SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
188 	    ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
189 
190 	error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
191 	    ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
192 
193 	if (error == 0) {
194 		zio_cksum_t cksum = bp->blk_cksum;
195 
196 		/*
197 		 * Validate the checksummed log block.
198 		 *
199 		 * Sequence numbers should be... sequential.  The checksum
200 		 * verifier for the next block should be bp's checksum plus 1.
201 		 *
202 		 * Also check the log chain linkage and size used.
203 		 */
204 		cksum.zc_word[ZIL_ZC_SEQ]++;
205 
206 		if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
207 			zil_chain_t *zilc = abuf->b_data;
208 			char *lr = (char *)(zilc + 1);
209 			uint64_t len = zilc->zc_nused - sizeof (zil_chain_t);
210 
211 			if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
212 			    sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
213 				error = SET_ERROR(ECKSUM);
214 			} else {
215 				ASSERT3U(len, <=, SPA_OLD_MAXBLOCKSIZE);
216 				bcopy(lr, dst, len);
217 				*end = (char *)dst + len;
218 				*nbp = zilc->zc_next_blk;
219 			}
220 		} else {
221 			char *lr = abuf->b_data;
222 			uint64_t size = BP_GET_LSIZE(bp);
223 			zil_chain_t *zilc = (zil_chain_t *)(lr + size) - 1;
224 
225 			if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
226 			    sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk) ||
227 			    (zilc->zc_nused > (size - sizeof (*zilc)))) {
228 				error = SET_ERROR(ECKSUM);
229 			} else {
230 				ASSERT3U(zilc->zc_nused, <=,
231 				    SPA_OLD_MAXBLOCKSIZE);
232 				bcopy(lr, dst, zilc->zc_nused);
233 				*end = (char *)dst + zilc->zc_nused;
234 				*nbp = zilc->zc_next_blk;
235 			}
236 		}
237 
238 		arc_buf_destroy(abuf, &abuf);
239 	}
240 
241 	return (error);
242 }
243 
244 /*
245  * Read a TX_WRITE log data block.
246  */
247 static int
248 zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
249 {
250 	enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
251 	const blkptr_t *bp = &lr->lr_blkptr;
252 	arc_flags_t aflags = ARC_FLAG_WAIT;
253 	arc_buf_t *abuf = NULL;
254 	zbookmark_phys_t zb;
255 	int error;
256 
257 	if (BP_IS_HOLE(bp)) {
258 		if (wbuf != NULL)
259 			bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
260 		return (0);
261 	}
262 
263 	if (zilog->zl_header->zh_claim_txg == 0)
264 		zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
265 
266 	SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
267 	    ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
268 
269 	error = arc_read(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
270 	    ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
271 
272 	if (error == 0) {
273 		if (wbuf != NULL)
274 			bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
275 		arc_buf_destroy(abuf, &abuf);
276 	}
277 
278 	return (error);
279 }
280 
281 /*
282  * Parse the intent log, and call parse_func for each valid record within.
283  */
284 int
285 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
286     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
287 {
288 	const zil_header_t *zh = zilog->zl_header;
289 	boolean_t claimed = !!zh->zh_claim_txg;
290 	uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
291 	uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
292 	uint64_t max_blk_seq = 0;
293 	uint64_t max_lr_seq = 0;
294 	uint64_t blk_count = 0;
295 	uint64_t lr_count = 0;
296 	blkptr_t blk, next_blk;
297 	char *lrbuf, *lrp;
298 	int error = 0;
299 
300 	/*
301 	 * Old logs didn't record the maximum zh_claim_lr_seq.
302 	 */
303 	if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
304 		claim_lr_seq = UINT64_MAX;
305 
306 	/*
307 	 * Starting at the block pointed to by zh_log we read the log chain.
308 	 * For each block in the chain we strongly check that block to
309 	 * ensure its validity.  We stop when an invalid block is found.
310 	 * For each block pointer in the chain we call parse_blk_func().
311 	 * For each record in each valid block we call parse_lr_func().
312 	 * If the log has been claimed, stop if we encounter a sequence
313 	 * number greater than the highest claimed sequence number.
314 	 */
315 	lrbuf = zio_buf_alloc(SPA_OLD_MAXBLOCKSIZE);
316 	zil_bp_tree_init(zilog);
317 
318 	for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
319 		uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
320 		int reclen;
321 		char *end;
322 
323 		if (blk_seq > claim_blk_seq)
324 			break;
325 		if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
326 			break;
327 		ASSERT3U(max_blk_seq, <, blk_seq);
328 		max_blk_seq = blk_seq;
329 		blk_count++;
330 
331 		if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
332 			break;
333 
334 		error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
335 		if (error != 0)
336 			break;
337 
338 		for (lrp = lrbuf; lrp < end; lrp += reclen) {
339 			lr_t *lr = (lr_t *)lrp;
340 			reclen = lr->lrc_reclen;
341 			ASSERT3U(reclen, >=, sizeof (lr_t));
342 			if (lr->lrc_seq > claim_lr_seq)
343 				goto done;
344 			if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
345 				goto done;
346 			ASSERT3U(max_lr_seq, <, lr->lrc_seq);
347 			max_lr_seq = lr->lrc_seq;
348 			lr_count++;
349 		}
350 	}
351 done:
352 	zilog->zl_parse_error = error;
353 	zilog->zl_parse_blk_seq = max_blk_seq;
354 	zilog->zl_parse_lr_seq = max_lr_seq;
355 	zilog->zl_parse_blk_count = blk_count;
356 	zilog->zl_parse_lr_count = lr_count;
357 
358 	ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
359 	    (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
360 
361 	zil_bp_tree_fini(zilog);
362 	zio_buf_free(lrbuf, SPA_OLD_MAXBLOCKSIZE);
363 
364 	return (error);
365 }
366 
367 static int
368 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
369 {
370 	/*
371 	 * Claim log block if not already committed and not already claimed.
372 	 * If tx == NULL, just verify that the block is claimable.
373 	 */
374 	if (BP_IS_HOLE(bp) || bp->blk_birth < first_txg ||
375 	    zil_bp_tree_add(zilog, bp) != 0)
376 		return (0);
377 
378 	return (zio_wait(zio_claim(NULL, zilog->zl_spa,
379 	    tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
380 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
381 }
382 
383 static int
384 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
385 {
386 	lr_write_t *lr = (lr_write_t *)lrc;
387 	int error;
388 
389 	if (lrc->lrc_txtype != TX_WRITE)
390 		return (0);
391 
392 	/*
393 	 * If the block is not readable, don't claim it.  This can happen
394 	 * in normal operation when a log block is written to disk before
395 	 * some of the dmu_sync() blocks it points to.  In this case, the
396 	 * transaction cannot have been committed to anyone (we would have
397 	 * waited for all writes to be stable first), so it is semantically
398 	 * correct to declare this the end of the log.
399 	 */
400 	if (lr->lr_blkptr.blk_birth >= first_txg &&
401 	    (error = zil_read_log_data(zilog, lr, NULL)) != 0)
402 		return (error);
403 	return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
404 }
405 
406 /* ARGSUSED */
407 static int
408 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
409 {
410 	zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
411 
412 	return (0);
413 }
414 
415 static int
416 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
417 {
418 	lr_write_t *lr = (lr_write_t *)lrc;
419 	blkptr_t *bp = &lr->lr_blkptr;
420 
421 	/*
422 	 * If we previously claimed it, we need to free it.
423 	 */
424 	if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
425 	    bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0 &&
426 	    !BP_IS_HOLE(bp))
427 		zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
428 
429 	return (0);
430 }
431 
432 static lwb_t *
433 zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg)
434 {
435 	lwb_t *lwb;
436 
437 	lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
438 	lwb->lwb_zilog = zilog;
439 	lwb->lwb_blk = *bp;
440 	lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
441 	lwb->lwb_max_txg = txg;
442 	lwb->lwb_zio = NULL;
443 	lwb->lwb_tx = NULL;
444 	if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
445 		lwb->lwb_nused = sizeof (zil_chain_t);
446 		lwb->lwb_sz = BP_GET_LSIZE(bp);
447 	} else {
448 		lwb->lwb_nused = 0;
449 		lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
450 	}
451 
452 	mutex_enter(&zilog->zl_lock);
453 	list_insert_tail(&zilog->zl_lwb_list, lwb);
454 	mutex_exit(&zilog->zl_lock);
455 
456 	return (lwb);
457 }
458 
459 /*
460  * Called when we create in-memory log transactions so that we know
461  * to cleanup the itxs at the end of spa_sync().
462  */
463 void
464 zilog_dirty(zilog_t *zilog, uint64_t txg)
465 {
466 	dsl_pool_t *dp = zilog->zl_dmu_pool;
467 	dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
468 
469 	if (ds->ds_is_snapshot)
470 		panic("dirtying snapshot!");
471 
472 	if (txg_list_add(&dp->dp_dirty_zilogs, zilog, txg)) {
473 		/* up the hold count until we can be written out */
474 		dmu_buf_add_ref(ds->ds_dbuf, zilog);
475 	}
476 }
477 
478 /*
479  * Determine if the zil is dirty in the specified txg. Callers wanting to
480  * ensure that the dirty state does not change must hold the itxg_lock for
481  * the specified txg. Holding the lock will ensure that the zil cannot be
482  * dirtied (zil_itx_assign) or cleaned (zil_clean) while we check its current
483  * state.
484  */
485 boolean_t
486 zilog_is_dirty_in_txg(zilog_t *zilog, uint64_t txg)
487 {
488 	dsl_pool_t *dp = zilog->zl_dmu_pool;
489 
490 	if (txg_list_member(&dp->dp_dirty_zilogs, zilog, txg & TXG_MASK))
491 		return (B_TRUE);
492 	return (B_FALSE);
493 }
494 
495 /*
496  * Determine if the zil is dirty. The zil is considered dirty if it has
497  * any pending itx records that have not been cleaned by zil_clean().
498  */
499 boolean_t
500 zilog_is_dirty(zilog_t *zilog)
501 {
502 	dsl_pool_t *dp = zilog->zl_dmu_pool;
503 
504 	for (int t = 0; t < TXG_SIZE; t++) {
505 		if (txg_list_member(&dp->dp_dirty_zilogs, zilog, t))
506 			return (B_TRUE);
507 	}
508 	return (B_FALSE);
509 }
510 
511 /*
512  * Create an on-disk intent log.
513  */
514 static lwb_t *
515 zil_create(zilog_t *zilog)
516 {
517 	const zil_header_t *zh = zilog->zl_header;
518 	lwb_t *lwb = NULL;
519 	uint64_t txg = 0;
520 	dmu_tx_t *tx = NULL;
521 	blkptr_t blk;
522 	int error = 0;
523 
524 	/*
525 	 * Wait for any previous destroy to complete.
526 	 */
527 	txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
528 
529 	ASSERT(zh->zh_claim_txg == 0);
530 	ASSERT(zh->zh_replay_seq == 0);
531 
532 	blk = zh->zh_log;
533 
534 	/*
535 	 * Allocate an initial log block if:
536 	 *    - there isn't one already
537 	 *    - the existing block is the wrong endianess
538 	 */
539 	if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
540 		tx = dmu_tx_create(zilog->zl_os);
541 		VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
542 		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
543 		txg = dmu_tx_get_txg(tx);
544 
545 		if (!BP_IS_HOLE(&blk)) {
546 			zio_free_zil(zilog->zl_spa, txg, &blk);
547 			BP_ZERO(&blk);
548 		}
549 
550 		error = zio_alloc_zil(zilog->zl_spa, txg, &blk, NULL,
551 		    ZIL_MIN_BLKSZ, zilog->zl_logbias == ZFS_LOGBIAS_LATENCY);
552 
553 		if (error == 0)
554 			zil_init_log_chain(zilog, &blk);
555 	}
556 
557 	/*
558 	 * Allocate a log write buffer (lwb) for the first log block.
559 	 */
560 	if (error == 0)
561 		lwb = zil_alloc_lwb(zilog, &blk, txg);
562 
563 	/*
564 	 * If we just allocated the first log block, commit our transaction
565 	 * and wait for zil_sync() to stuff the block poiner into zh_log.
566 	 * (zh is part of the MOS, so we cannot modify it in open context.)
567 	 */
568 	if (tx != NULL) {
569 		dmu_tx_commit(tx);
570 		txg_wait_synced(zilog->zl_dmu_pool, txg);
571 	}
572 
573 	ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
574 
575 	return (lwb);
576 }
577 
578 /*
579  * In one tx, free all log blocks and clear the log header.
580  * If keep_first is set, then we're replaying a log with no content.
581  * We want to keep the first block, however, so that the first
582  * synchronous transaction doesn't require a txg_wait_synced()
583  * in zil_create().  We don't need to txg_wait_synced() here either
584  * when keep_first is set, because both zil_create() and zil_destroy()
585  * will wait for any in-progress destroys to complete.
586  */
587 void
588 zil_destroy(zilog_t *zilog, boolean_t keep_first)
589 {
590 	const zil_header_t *zh = zilog->zl_header;
591 	lwb_t *lwb;
592 	dmu_tx_t *tx;
593 	uint64_t txg;
594 
595 	/*
596 	 * Wait for any previous destroy to complete.
597 	 */
598 	txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
599 
600 	zilog->zl_old_header = *zh;		/* debugging aid */
601 
602 	if (BP_IS_HOLE(&zh->zh_log))
603 		return;
604 
605 	tx = dmu_tx_create(zilog->zl_os);
606 	VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
607 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
608 	txg = dmu_tx_get_txg(tx);
609 
610 	mutex_enter(&zilog->zl_lock);
611 
612 	ASSERT3U(zilog->zl_destroy_txg, <, txg);
613 	zilog->zl_destroy_txg = txg;
614 	zilog->zl_keep_first = keep_first;
615 
616 	if (!list_is_empty(&zilog->zl_lwb_list)) {
617 		ASSERT(zh->zh_claim_txg == 0);
618 		VERIFY(!keep_first);
619 		while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
620 			list_remove(&zilog->zl_lwb_list, lwb);
621 			if (lwb->lwb_buf != NULL)
622 				zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
623 			zio_free_zil(zilog->zl_spa, txg, &lwb->lwb_blk);
624 			kmem_cache_free(zil_lwb_cache, lwb);
625 		}
626 	} else if (!keep_first) {
627 		zil_destroy_sync(zilog, tx);
628 	}
629 	mutex_exit(&zilog->zl_lock);
630 
631 	dmu_tx_commit(tx);
632 }
633 
634 void
635 zil_destroy_sync(zilog_t *zilog, dmu_tx_t *tx)
636 {
637 	ASSERT(list_is_empty(&zilog->zl_lwb_list));
638 	(void) zil_parse(zilog, zil_free_log_block,
639 	    zil_free_log_record, tx, zilog->zl_header->zh_claim_txg);
640 }
641 
642 int
643 zil_claim(dsl_pool_t *dp, dsl_dataset_t *ds, void *txarg)
644 {
645 	dmu_tx_t *tx = txarg;
646 	uint64_t first_txg = dmu_tx_get_txg(tx);
647 	zilog_t *zilog;
648 	zil_header_t *zh;
649 	objset_t *os;
650 	int error;
651 
652 	error = dmu_objset_own_obj(dp, ds->ds_object,
653 	    DMU_OST_ANY, B_FALSE, FTAG, &os);
654 	if (error != 0) {
655 		/*
656 		 * EBUSY indicates that the objset is inconsistent, in which
657 		 * case it can not have a ZIL.
658 		 */
659 		if (error != EBUSY) {
660 			cmn_err(CE_WARN, "can't open objset for %llu, error %u",
661 			    (unsigned long long)ds->ds_object, error);
662 		}
663 		return (0);
664 	}
665 
666 	zilog = dmu_objset_zil(os);
667 	zh = zil_header_in_syncing_context(zilog);
668 
669 	if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
670 		if (!BP_IS_HOLE(&zh->zh_log))
671 			zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
672 		BP_ZERO(&zh->zh_log);
673 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
674 		dmu_objset_disown(os, FTAG);
675 		return (0);
676 	}
677 
678 	/*
679 	 * Claim all log blocks if we haven't already done so, and remember
680 	 * the highest claimed sequence number.  This ensures that if we can
681 	 * read only part of the log now (e.g. due to a missing device),
682 	 * but we can read the entire log later, we will not try to replay
683 	 * or destroy beyond the last block we successfully claimed.
684 	 */
685 	ASSERT3U(zh->zh_claim_txg, <=, first_txg);
686 	if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
687 		(void) zil_parse(zilog, zil_claim_log_block,
688 		    zil_claim_log_record, tx, first_txg);
689 		zh->zh_claim_txg = first_txg;
690 		zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
691 		zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
692 		if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
693 			zh->zh_flags |= ZIL_REPLAY_NEEDED;
694 		zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
695 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
696 	}
697 
698 	ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
699 	dmu_objset_disown(os, FTAG);
700 	return (0);
701 }
702 
703 /*
704  * Check the log by walking the log chain.
705  * Checksum errors are ok as they indicate the end of the chain.
706  * Any other error (no device or read failure) returns an error.
707  */
708 /* ARGSUSED */
709 int
710 zil_check_log_chain(dsl_pool_t *dp, dsl_dataset_t *ds, void *tx)
711 {
712 	zilog_t *zilog;
713 	objset_t *os;
714 	blkptr_t *bp;
715 	int error;
716 
717 	ASSERT(tx == NULL);
718 
719 	error = dmu_objset_from_ds(ds, &os);
720 	if (error != 0) {
721 		cmn_err(CE_WARN, "can't open objset %llu, error %d",
722 		    (unsigned long long)ds->ds_object, error);
723 		return (0);
724 	}
725 
726 	zilog = dmu_objset_zil(os);
727 	bp = (blkptr_t *)&zilog->zl_header->zh_log;
728 
729 	/*
730 	 * Check the first block and determine if it's on a log device
731 	 * which may have been removed or faulted prior to loading this
732 	 * pool.  If so, there's no point in checking the rest of the log
733 	 * as its content should have already been synced to the pool.
734 	 */
735 	if (!BP_IS_HOLE(bp)) {
736 		vdev_t *vd;
737 		boolean_t valid = B_TRUE;
738 
739 		spa_config_enter(os->os_spa, SCL_STATE, FTAG, RW_READER);
740 		vd = vdev_lookup_top(os->os_spa, DVA_GET_VDEV(&bp->blk_dva[0]));
741 		if (vd->vdev_islog && vdev_is_dead(vd))
742 			valid = vdev_log_state_valid(vd);
743 		spa_config_exit(os->os_spa, SCL_STATE, FTAG);
744 
745 		if (!valid)
746 			return (0);
747 	}
748 
749 	/*
750 	 * Because tx == NULL, zil_claim_log_block() will not actually claim
751 	 * any blocks, but just determine whether it is possible to do so.
752 	 * In addition to checking the log chain, zil_claim_log_block()
753 	 * will invoke zio_claim() with a done func of spa_claim_notify(),
754 	 * which will update spa_max_claim_txg.  See spa_load() for details.
755 	 */
756 	error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
757 	    zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
758 
759 	return ((error == ECKSUM || error == ENOENT) ? 0 : error);
760 }
761 
762 static int
763 zil_vdev_compare(const void *x1, const void *x2)
764 {
765 	const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
766 	const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
767 
768 	if (v1 < v2)
769 		return (-1);
770 	if (v1 > v2)
771 		return (1);
772 
773 	return (0);
774 }
775 
776 void
777 zil_add_block(zilog_t *zilog, const blkptr_t *bp)
778 {
779 	avl_tree_t *t = &zilog->zl_vdev_tree;
780 	avl_index_t where;
781 	zil_vdev_node_t *zv, zvsearch;
782 	int ndvas = BP_GET_NDVAS(bp);
783 	int i;
784 
785 	if (zfs_nocacheflush)
786 		return;
787 
788 	ASSERT(zilog->zl_writer);
789 
790 	/*
791 	 * Even though we're zl_writer, we still need a lock because the
792 	 * zl_get_data() callbacks may have dmu_sync() done callbacks
793 	 * that will run concurrently.
794 	 */
795 	mutex_enter(&zilog->zl_vdev_lock);
796 	for (i = 0; i < ndvas; i++) {
797 		zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
798 		if (avl_find(t, &zvsearch, &where) == NULL) {
799 			zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
800 			zv->zv_vdev = zvsearch.zv_vdev;
801 			avl_insert(t, zv, where);
802 		}
803 	}
804 	mutex_exit(&zilog->zl_vdev_lock);
805 }
806 
807 static void
808 zil_flush_vdevs(zilog_t *zilog)
809 {
810 	spa_t *spa = zilog->zl_spa;
811 	avl_tree_t *t = &zilog->zl_vdev_tree;
812 	void *cookie = NULL;
813 	zil_vdev_node_t *zv;
814 	zio_t *zio;
815 
816 	ASSERT(zilog->zl_writer);
817 
818 	/*
819 	 * We don't need zl_vdev_lock here because we're the zl_writer,
820 	 * and all zl_get_data() callbacks are done.
821 	 */
822 	if (avl_numnodes(t) == 0)
823 		return;
824 
825 	spa_config_enter(spa, SCL_STATE, FTAG, RW_READER);
826 
827 	zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
828 
829 	while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
830 		vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
831 		if (vd != NULL)
832 			zio_flush(zio, vd);
833 		kmem_free(zv, sizeof (*zv));
834 	}
835 
836 	/*
837 	 * Wait for all the flushes to complete.  Not all devices actually
838 	 * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
839 	 */
840 	(void) zio_wait(zio);
841 
842 	spa_config_exit(spa, SCL_STATE, FTAG);
843 }
844 
845 /*
846  * Function called when a log block write completes
847  */
848 static void
849 zil_lwb_write_done(zio_t *zio)
850 {
851 	lwb_t *lwb = zio->io_private;
852 	zilog_t *zilog = lwb->lwb_zilog;
853 	dmu_tx_t *tx = lwb->lwb_tx;
854 
855 	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
856 	ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
857 	ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
858 	ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
859 	ASSERT(!BP_IS_GANG(zio->io_bp));
860 	ASSERT(!BP_IS_HOLE(zio->io_bp));
861 	ASSERT(BP_GET_FILL(zio->io_bp) == 0);
862 
863 	/*
864 	 * Ensure the lwb buffer pointer is cleared before releasing
865 	 * the txg. If we have had an allocation failure and
866 	 * the txg is waiting to sync then we want want zil_sync()
867 	 * to remove the lwb so that it's not picked up as the next new
868 	 * one in zil_commit_writer(). zil_sync() will only remove
869 	 * the lwb if lwb_buf is null.
870 	 */
871 	abd_put(zio->io_abd);
872 	zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
873 	mutex_enter(&zilog->zl_lock);
874 	lwb->lwb_buf = NULL;
875 	lwb->lwb_tx = NULL;
876 	mutex_exit(&zilog->zl_lock);
877 
878 	/*
879 	 * Now that we've written this log block, we have a stable pointer
880 	 * to the next block in the chain, so it's OK to let the txg in
881 	 * which we allocated the next block sync.
882 	 */
883 	dmu_tx_commit(tx);
884 }
885 
886 /*
887  * Initialize the io for a log block.
888  */
889 static void
890 zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
891 {
892 	zbookmark_phys_t zb;
893 
894 	SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
895 	    ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
896 	    lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
897 
898 	if (zilog->zl_root_zio == NULL) {
899 		zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
900 		    ZIO_FLAG_CANFAIL);
901 	}
902 	if (lwb->lwb_zio == NULL) {
903 		abd_t *lwb_abd = abd_get_from_buf(lwb->lwb_buf,
904 		    BP_GET_LSIZE(&lwb->lwb_blk));
905 		lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
906 		    0, &lwb->lwb_blk, lwb_abd, BP_GET_LSIZE(&lwb->lwb_blk),
907 		    zil_lwb_write_done, lwb, ZIO_PRIORITY_SYNC_WRITE,
908 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
909 	}
910 }
911 
912 /*
913  * Define a limited set of intent log block sizes.
914  *
915  * These must be a multiple of 4KB. Note only the amount used (again
916  * aligned to 4KB) actually gets written. However, we can't always just
917  * allocate SPA_OLD_MAXBLOCKSIZE as the slog space could be exhausted.
918  */
919 uint64_t zil_block_buckets[] = {
920     4096,		/* non TX_WRITE */
921     8192+4096,		/* data base */
922     32*1024 + 4096, 	/* NFS writes */
923     UINT64_MAX
924 };
925 
926 /*
927  * Use the slog as long as the logbias is 'latency' and the current commit size
928  * is less than the limit or the total list size is less than 2X the limit.
929  * Limit checking is disabled by setting zil_slog_limit to UINT64_MAX.
930  */
931 uint64_t zil_slog_limit = 1024 * 1024;
932 #define	USE_SLOG(zilog) (((zilog)->zl_logbias == ZFS_LOGBIAS_LATENCY) && \
933 	(((zilog)->zl_cur_used < zil_slog_limit) || \
934 	((zilog)->zl_itx_list_sz < (zil_slog_limit << 1))))
935 
936 /*
937  * Start a log block write and advance to the next log block.
938  * Calls are serialized.
939  */
940 static lwb_t *
941 zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
942 {
943 	lwb_t *nlwb = NULL;
944 	zil_chain_t *zilc;
945 	spa_t *spa = zilog->zl_spa;
946 	blkptr_t *bp;
947 	dmu_tx_t *tx;
948 	uint64_t txg;
949 	uint64_t zil_blksz, wsz;
950 	int i, error;
951 
952 	if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
953 		zilc = (zil_chain_t *)lwb->lwb_buf;
954 		bp = &zilc->zc_next_blk;
955 	} else {
956 		zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
957 		bp = &zilc->zc_next_blk;
958 	}
959 
960 	ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
961 
962 	/*
963 	 * Allocate the next block and save its address in this block
964 	 * before writing it in order to establish the log chain.
965 	 * Note that if the allocation of nlwb synced before we wrote
966 	 * the block that points at it (lwb), we'd leak it if we crashed.
967 	 * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
968 	 * We dirty the dataset to ensure that zil_sync() will be called
969 	 * to clean up in the event of allocation failure or I/O failure.
970 	 */
971 	tx = dmu_tx_create(zilog->zl_os);
972 	VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
973 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
974 	txg = dmu_tx_get_txg(tx);
975 
976 	lwb->lwb_tx = tx;
977 
978 	/*
979 	 * Log blocks are pre-allocated. Here we select the size of the next
980 	 * block, based on size used in the last block.
981 	 * - first find the smallest bucket that will fit the block from a
982 	 *   limited set of block sizes. This is because it's faster to write
983 	 *   blocks allocated from the same metaslab as they are adjacent or
984 	 *   close.
985 	 * - next find the maximum from the new suggested size and an array of
986 	 *   previous sizes. This lessens a picket fence effect of wrongly
987 	 *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
988 	 *   requests.
989 	 *
990 	 * Note we only write what is used, but we can't just allocate
991 	 * the maximum block size because we can exhaust the available
992 	 * pool log space.
993 	 */
994 	zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
995 	for (i = 0; zil_blksz > zil_block_buckets[i]; i++)
996 		continue;
997 	zil_blksz = zil_block_buckets[i];
998 	if (zil_blksz == UINT64_MAX)
999 		zil_blksz = SPA_OLD_MAXBLOCKSIZE;
1000 	zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
1001 	for (i = 0; i < ZIL_PREV_BLKS; i++)
1002 		zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
1003 	zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
1004 
1005 	BP_ZERO(bp);
1006 	/* pass the old blkptr in order to spread log blocks across devs */
1007 	error = zio_alloc_zil(spa, txg, bp, &lwb->lwb_blk, zil_blksz,
1008 	    USE_SLOG(zilog));
1009 	if (error == 0) {
1010 		ASSERT3U(bp->blk_birth, ==, txg);
1011 		bp->blk_cksum = lwb->lwb_blk.blk_cksum;
1012 		bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
1013 
1014 		/*
1015 		 * Allocate a new log write buffer (lwb).
1016 		 */
1017 		nlwb = zil_alloc_lwb(zilog, bp, txg);
1018 
1019 		/* Record the block for later vdev flushing */
1020 		zil_add_block(zilog, &lwb->lwb_blk);
1021 	}
1022 
1023 	if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
1024 		/* For Slim ZIL only write what is used. */
1025 		wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
1026 		ASSERT3U(wsz, <=, lwb->lwb_sz);
1027 		zio_shrink(lwb->lwb_zio, wsz);
1028 
1029 	} else {
1030 		wsz = lwb->lwb_sz;
1031 	}
1032 
1033 	zilc->zc_pad = 0;
1034 	zilc->zc_nused = lwb->lwb_nused;
1035 	zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
1036 
1037 	/*
1038 	 * clear unused data for security
1039 	 */
1040 	bzero(lwb->lwb_buf + lwb->lwb_nused, wsz - lwb->lwb_nused);
1041 
1042 	zio_nowait(lwb->lwb_zio); /* Kick off the write for the old log block */
1043 
1044 	/*
1045 	 * If there was an allocation failure then nlwb will be null which
1046 	 * forces a txg_wait_synced().
1047 	 */
1048 	return (nlwb);
1049 }
1050 
1051 static lwb_t *
1052 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
1053 {
1054 	lr_t *lrc = &itx->itx_lr; /* common log record */
1055 	lr_write_t *lrw = (lr_write_t *)lrc;
1056 	char *lr_buf;
1057 	uint64_t txg = lrc->lrc_txg;
1058 	uint64_t reclen = lrc->lrc_reclen;
1059 	uint64_t dlen = 0;
1060 
1061 	if (lwb == NULL)
1062 		return (NULL);
1063 
1064 	ASSERT(lwb->lwb_buf != NULL);
1065 
1066 	if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
1067 		dlen = P2ROUNDUP_TYPED(
1068 		    lrw->lr_length, sizeof (uint64_t), uint64_t);
1069 
1070 	zilog->zl_cur_used += (reclen + dlen);
1071 
1072 	zil_lwb_write_init(zilog, lwb);
1073 
1074 	/*
1075 	 * If this record won't fit in the current log block, start a new one.
1076 	 */
1077 	if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
1078 		lwb = zil_lwb_write_start(zilog, lwb);
1079 		if (lwb == NULL)
1080 			return (NULL);
1081 		zil_lwb_write_init(zilog, lwb);
1082 		ASSERT(LWB_EMPTY(lwb));
1083 		if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
1084 			txg_wait_synced(zilog->zl_dmu_pool, txg);
1085 			return (lwb);
1086 		}
1087 	}
1088 
1089 	lr_buf = lwb->lwb_buf + lwb->lwb_nused;
1090 	bcopy(lrc, lr_buf, reclen);
1091 	lrc = (lr_t *)lr_buf;
1092 	lrw = (lr_write_t *)lrc;
1093 
1094 	/*
1095 	 * If it's a write, fetch the data or get its blkptr as appropriate.
1096 	 */
1097 	if (lrc->lrc_txtype == TX_WRITE) {
1098 		if (txg > spa_freeze_txg(zilog->zl_spa))
1099 			txg_wait_synced(zilog->zl_dmu_pool, txg);
1100 		if (itx->itx_wr_state != WR_COPIED) {
1101 			char *dbuf;
1102 			int error;
1103 
1104 			if (dlen) {
1105 				ASSERT(itx->itx_wr_state == WR_NEED_COPY);
1106 				dbuf = lr_buf + reclen;
1107 				lrw->lr_common.lrc_reclen += dlen;
1108 			} else {
1109 				ASSERT(itx->itx_wr_state == WR_INDIRECT);
1110 				dbuf = NULL;
1111 			}
1112 			error = zilog->zl_get_data(
1113 			    itx->itx_private, lrw, dbuf, lwb->lwb_zio);
1114 			if (error == EIO) {
1115 				txg_wait_synced(zilog->zl_dmu_pool, txg);
1116 				return (lwb);
1117 			}
1118 			if (error != 0) {
1119 				ASSERT(error == ENOENT || error == EEXIST ||
1120 				    error == EALREADY);
1121 				return (lwb);
1122 			}
1123 		}
1124 	}
1125 
1126 	/*
1127 	 * We're actually making an entry, so update lrc_seq to be the
1128 	 * log record sequence number.  Note that this is generally not
1129 	 * equal to the itx sequence number because not all transactions
1130 	 * are synchronous, and sometimes spa_sync() gets there first.
1131 	 */
1132 	lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
1133 	lwb->lwb_nused += reclen + dlen;
1134 	lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
1135 	ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
1136 	ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
1137 
1138 	return (lwb);
1139 }
1140 
1141 itx_t *
1142 zil_itx_create(uint64_t txtype, size_t lrsize)
1143 {
1144 	itx_t *itx;
1145 
1146 	lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
1147 
1148 	itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
1149 	itx->itx_lr.lrc_txtype = txtype;
1150 	itx->itx_lr.lrc_reclen = lrsize;
1151 	itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
1152 	itx->itx_lr.lrc_seq = 0;	/* defensive */
1153 	itx->itx_sync = B_TRUE;		/* default is synchronous */
1154 
1155 	return (itx);
1156 }
1157 
1158 void
1159 zil_itx_destroy(itx_t *itx)
1160 {
1161 	kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
1162 }
1163 
1164 /*
1165  * Free up the sync and async itxs. The itxs_t has already been detached
1166  * so no locks are needed.
1167  */
1168 static void
1169 zil_itxg_clean(itxs_t *itxs)
1170 {
1171 	itx_t *itx;
1172 	list_t *list;
1173 	avl_tree_t *t;
1174 	void *cookie;
1175 	itx_async_node_t *ian;
1176 
1177 	list = &itxs->i_sync_list;
1178 	while ((itx = list_head(list)) != NULL) {
1179 		list_remove(list, itx);
1180 		kmem_free(itx, offsetof(itx_t, itx_lr) +
1181 		    itx->itx_lr.lrc_reclen);
1182 	}
1183 
1184 	cookie = NULL;
1185 	t = &itxs->i_async_tree;
1186 	while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1187 		list = &ian->ia_list;
1188 		while ((itx = list_head(list)) != NULL) {
1189 			list_remove(list, itx);
1190 			kmem_free(itx, offsetof(itx_t, itx_lr) +
1191 			    itx->itx_lr.lrc_reclen);
1192 		}
1193 		list_destroy(list);
1194 		kmem_free(ian, sizeof (itx_async_node_t));
1195 	}
1196 	avl_destroy(t);
1197 
1198 	kmem_free(itxs, sizeof (itxs_t));
1199 }
1200 
1201 static int
1202 zil_aitx_compare(const void *x1, const void *x2)
1203 {
1204 	const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
1205 	const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
1206 
1207 	if (o1 < o2)
1208 		return (-1);
1209 	if (o1 > o2)
1210 		return (1);
1211 
1212 	return (0);
1213 }
1214 
1215 /*
1216  * Remove all async itx with the given oid.
1217  */
1218 static void
1219 zil_remove_async(zilog_t *zilog, uint64_t oid)
1220 {
1221 	uint64_t otxg, txg;
1222 	itx_async_node_t *ian;
1223 	avl_tree_t *t;
1224 	avl_index_t where;
1225 	list_t clean_list;
1226 	itx_t *itx;
1227 
1228 	ASSERT(oid != 0);
1229 	list_create(&clean_list, sizeof (itx_t), offsetof(itx_t, itx_node));
1230 
1231 	if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1232 		otxg = ZILTEST_TXG;
1233 	else
1234 		otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1235 
1236 	for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1237 		itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1238 
1239 		mutex_enter(&itxg->itxg_lock);
1240 		if (itxg->itxg_txg != txg) {
1241 			mutex_exit(&itxg->itxg_lock);
1242 			continue;
1243 		}
1244 
1245 		/*
1246 		 * Locate the object node and append its list.
1247 		 */
1248 		t = &itxg->itxg_itxs->i_async_tree;
1249 		ian = avl_find(t, &oid, &where);
1250 		if (ian != NULL)
1251 			list_move_tail(&clean_list, &ian->ia_list);
1252 		mutex_exit(&itxg->itxg_lock);
1253 	}
1254 	while ((itx = list_head(&clean_list)) != NULL) {
1255 		list_remove(&clean_list, itx);
1256 		kmem_free(itx, offsetof(itx_t, itx_lr) +
1257 		    itx->itx_lr.lrc_reclen);
1258 	}
1259 	list_destroy(&clean_list);
1260 }
1261 
1262 void
1263 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
1264 {
1265 	uint64_t txg;
1266 	itxg_t *itxg;
1267 	itxs_t *itxs, *clean = NULL;
1268 
1269 	/*
1270 	 * Object ids can be re-instantiated in the next txg so
1271 	 * remove any async transactions to avoid future leaks.
1272 	 * This can happen if a fsync occurs on the re-instantiated
1273 	 * object for a WR_INDIRECT or WR_NEED_COPY write, which gets
1274 	 * the new file data and flushes a write record for the old object.
1275 	 */
1276 	if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_REMOVE)
1277 		zil_remove_async(zilog, itx->itx_oid);
1278 
1279 	/*
1280 	 * Ensure the data of a renamed file is committed before the rename.
1281 	 */
1282 	if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_RENAME)
1283 		zil_async_to_sync(zilog, itx->itx_oid);
1284 
1285 	if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX)
1286 		txg = ZILTEST_TXG;
1287 	else
1288 		txg = dmu_tx_get_txg(tx);
1289 
1290 	itxg = &zilog->zl_itxg[txg & TXG_MASK];
1291 	mutex_enter(&itxg->itxg_lock);
1292 	itxs = itxg->itxg_itxs;
1293 	if (itxg->itxg_txg != txg) {
1294 		if (itxs != NULL) {
1295 			/*
1296 			 * The zil_clean callback hasn't got around to cleaning
1297 			 * this itxg. Save the itxs for release below.
1298 			 * This should be rare.
1299 			 */
1300 			zfs_dbgmsg("zil_itx_assign: missed itx cleanup for "
1301 			    "txg %llu", itxg->itxg_txg);
1302 			atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
1303 			itxg->itxg_sod = 0;
1304 			clean = itxg->itxg_itxs;
1305 		}
1306 		ASSERT(itxg->itxg_sod == 0);
1307 		itxg->itxg_txg = txg;
1308 		itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t), KM_SLEEP);
1309 
1310 		list_create(&itxs->i_sync_list, sizeof (itx_t),
1311 		    offsetof(itx_t, itx_node));
1312 		avl_create(&itxs->i_async_tree, zil_aitx_compare,
1313 		    sizeof (itx_async_node_t),
1314 		    offsetof(itx_async_node_t, ia_node));
1315 	}
1316 	if (itx->itx_sync) {
1317 		list_insert_tail(&itxs->i_sync_list, itx);
1318 		atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
1319 		itxg->itxg_sod += itx->itx_sod;
1320 	} else {
1321 		avl_tree_t *t = &itxs->i_async_tree;
1322 		uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
1323 		itx_async_node_t *ian;
1324 		avl_index_t where;
1325 
1326 		ian = avl_find(t, &foid, &where);
1327 		if (ian == NULL) {
1328 			ian = kmem_alloc(sizeof (itx_async_node_t), KM_SLEEP);
1329 			list_create(&ian->ia_list, sizeof (itx_t),
1330 			    offsetof(itx_t, itx_node));
1331 			ian->ia_foid = foid;
1332 			avl_insert(t, ian, where);
1333 		}
1334 		list_insert_tail(&ian->ia_list, itx);
1335 	}
1336 
1337 	itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
1338 	zilog_dirty(zilog, txg);
1339 	mutex_exit(&itxg->itxg_lock);
1340 
1341 	/* Release the old itxs now we've dropped the lock */
1342 	if (clean != NULL)
1343 		zil_itxg_clean(clean);
1344 }
1345 
1346 /*
1347  * If there are any in-memory intent log transactions which have now been
1348  * synced then start up a taskq to free them. We should only do this after we
1349  * have written out the uberblocks (i.e. txg has been comitted) so that
1350  * don't inadvertently clean out in-memory log records that would be required
1351  * by zil_commit().
1352  */
1353 void
1354 zil_clean(zilog_t *zilog, uint64_t synced_txg)
1355 {
1356 	itxg_t *itxg = &zilog->zl_itxg[synced_txg & TXG_MASK];
1357 	itxs_t *clean_me;
1358 
1359 	mutex_enter(&itxg->itxg_lock);
1360 	if (itxg->itxg_itxs == NULL || itxg->itxg_txg == ZILTEST_TXG) {
1361 		mutex_exit(&itxg->itxg_lock);
1362 		return;
1363 	}
1364 	ASSERT3U(itxg->itxg_txg, <=, synced_txg);
1365 	ASSERT(itxg->itxg_txg != 0);
1366 	ASSERT(zilog->zl_clean_taskq != NULL);
1367 	atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
1368 	itxg->itxg_sod = 0;
1369 	clean_me = itxg->itxg_itxs;
1370 	itxg->itxg_itxs = NULL;
1371 	itxg->itxg_txg = 0;
1372 	mutex_exit(&itxg->itxg_lock);
1373 	/*
1374 	 * Preferably start a task queue to free up the old itxs but
1375 	 * if taskq_dispatch can't allocate resources to do that then
1376 	 * free it in-line. This should be rare. Note, using TQ_SLEEP
1377 	 * created a bad performance problem.
1378 	 */
1379 	if (taskq_dispatch(zilog->zl_clean_taskq,
1380 	    (void (*)(void *))zil_itxg_clean, clean_me, TQ_NOSLEEP) == NULL)
1381 		zil_itxg_clean(clean_me);
1382 }
1383 
1384 /*
1385  * Get the list of itxs to commit into zl_itx_commit_list.
1386  */
1387 static void
1388 zil_get_commit_list(zilog_t *zilog)
1389 {
1390 	uint64_t otxg, txg;
1391 	list_t *commit_list = &zilog->zl_itx_commit_list;
1392 	uint64_t push_sod = 0;
1393 
1394 	if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1395 		otxg = ZILTEST_TXG;
1396 	else
1397 		otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1398 
1399 	/*
1400 	 * This is inherently racy, since there is nothing to prevent
1401 	 * the last synced txg from changing. That's okay since we'll
1402 	 * only commit things in the future.
1403 	 */
1404 	for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1405 		itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1406 
1407 		mutex_enter(&itxg->itxg_lock);
1408 		if (itxg->itxg_txg != txg) {
1409 			mutex_exit(&itxg->itxg_lock);
1410 			continue;
1411 		}
1412 
1413 		/*
1414 		 * If we're adding itx records to the zl_itx_commit_list,
1415 		 * then the zil better be dirty in this "txg". We can assert
1416 		 * that here since we're holding the itxg_lock which will
1417 		 * prevent spa_sync from cleaning it. Once we add the itxs
1418 		 * to the zl_itx_commit_list we must commit it to disk even
1419 		 * if it's unnecessary (i.e. the txg was synced).
1420 		 */
1421 		ASSERT(zilog_is_dirty_in_txg(zilog, txg) ||
1422 		    spa_freeze_txg(zilog->zl_spa) != UINT64_MAX);
1423 		list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
1424 		push_sod += itxg->itxg_sod;
1425 		itxg->itxg_sod = 0;
1426 
1427 		mutex_exit(&itxg->itxg_lock);
1428 	}
1429 	atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
1430 }
1431 
1432 /*
1433  * Move the async itxs for a specified object to commit into sync lists.
1434  */
1435 static void
1436 zil_async_to_sync(zilog_t *zilog, uint64_t foid)
1437 {
1438 	uint64_t otxg, txg;
1439 	itx_async_node_t *ian;
1440 	avl_tree_t *t;
1441 	avl_index_t where;
1442 
1443 	if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1444 		otxg = ZILTEST_TXG;
1445 	else
1446 		otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1447 
1448 	/*
1449 	 * This is inherently racy, since there is nothing to prevent
1450 	 * the last synced txg from changing.
1451 	 */
1452 	for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1453 		itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1454 
1455 		mutex_enter(&itxg->itxg_lock);
1456 		if (itxg->itxg_txg != txg) {
1457 			mutex_exit(&itxg->itxg_lock);
1458 			continue;
1459 		}
1460 
1461 		/*
1462 		 * If a foid is specified then find that node and append its
1463 		 * list. Otherwise walk the tree appending all the lists
1464 		 * to the sync list. We add to the end rather than the
1465 		 * beginning to ensure the create has happened.
1466 		 */
1467 		t = &itxg->itxg_itxs->i_async_tree;
1468 		if (foid != 0) {
1469 			ian = avl_find(t, &foid, &where);
1470 			if (ian != NULL) {
1471 				list_move_tail(&itxg->itxg_itxs->i_sync_list,
1472 				    &ian->ia_list);
1473 			}
1474 		} else {
1475 			void *cookie = NULL;
1476 
1477 			while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1478 				list_move_tail(&itxg->itxg_itxs->i_sync_list,
1479 				    &ian->ia_list);
1480 				list_destroy(&ian->ia_list);
1481 				kmem_free(ian, sizeof (itx_async_node_t));
1482 			}
1483 		}
1484 		mutex_exit(&itxg->itxg_lock);
1485 	}
1486 }
1487 
1488 static void
1489 zil_commit_writer(zilog_t *zilog)
1490 {
1491 	uint64_t txg;
1492 	itx_t *itx;
1493 	lwb_t *lwb;
1494 	spa_t *spa = zilog->zl_spa;
1495 	int error = 0;
1496 
1497 	ASSERT(zilog->zl_root_zio == NULL);
1498 
1499 	mutex_exit(&zilog->zl_lock);
1500 
1501 	zil_get_commit_list(zilog);
1502 
1503 	/*
1504 	 * Return if there's nothing to commit before we dirty the fs by
1505 	 * calling zil_create().
1506 	 */
1507 	if (list_head(&zilog->zl_itx_commit_list) == NULL) {
1508 		mutex_enter(&zilog->zl_lock);
1509 		return;
1510 	}
1511 
1512 	if (zilog->zl_suspend) {
1513 		lwb = NULL;
1514 	} else {
1515 		lwb = list_tail(&zilog->zl_lwb_list);
1516 		if (lwb == NULL)
1517 			lwb = zil_create(zilog);
1518 	}
1519 
1520 	DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
1521 	while (itx = list_head(&zilog->zl_itx_commit_list)) {
1522 		txg = itx->itx_lr.lrc_txg;
1523 		ASSERT3U(txg, !=, 0);
1524 
1525 		/*
1526 		 * This is inherently racy and may result in us writing
1527 		 * out a log block for a txg that was just synced. This is
1528 		 * ok since we'll end cleaning up that log block the next
1529 		 * time we call zil_sync().
1530 		 */
1531 		if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
1532 			lwb = zil_lwb_commit(zilog, itx, lwb);
1533 		list_remove(&zilog->zl_itx_commit_list, itx);
1534 		kmem_free(itx, offsetof(itx_t, itx_lr)
1535 		    + itx->itx_lr.lrc_reclen);
1536 	}
1537 	DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
1538 
1539 	/* write the last block out */
1540 	if (lwb != NULL && lwb->lwb_zio != NULL)
1541 		lwb = zil_lwb_write_start(zilog, lwb);
1542 
1543 	zilog->zl_cur_used = 0;
1544 
1545 	/*
1546 	 * Wait if necessary for the log blocks to be on stable storage.
1547 	 */
1548 	if (zilog->zl_root_zio) {
1549 		error = zio_wait(zilog->zl_root_zio);
1550 		zilog->zl_root_zio = NULL;
1551 		zil_flush_vdevs(zilog);
1552 	}
1553 
1554 	if (error || lwb == NULL)
1555 		txg_wait_synced(zilog->zl_dmu_pool, 0);
1556 
1557 	mutex_enter(&zilog->zl_lock);
1558 
1559 	/*
1560 	 * Remember the highest committed log sequence number for ztest.
1561 	 * We only update this value when all the log writes succeeded,
1562 	 * because ztest wants to ASSERT that it got the whole log chain.
1563 	 */
1564 	if (error == 0 && lwb != NULL)
1565 		zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
1566 }
1567 
1568 /*
1569  * Commit zfs transactions to stable storage.
1570  * If foid is 0 push out all transactions, otherwise push only those
1571  * for that object or might reference that object.
1572  *
1573  * itxs are committed in batches. In a heavily stressed zil there will be
1574  * a commit writer thread who is writing out a bunch of itxs to the log
1575  * for a set of committing threads (cthreads) in the same batch as the writer.
1576  * Those cthreads are all waiting on the same cv for that batch.
1577  *
1578  * There will also be a different and growing batch of threads that are
1579  * waiting to commit (qthreads). When the committing batch completes
1580  * a transition occurs such that the cthreads exit and the qthreads become
1581  * cthreads. One of the new cthreads becomes the writer thread for the
1582  * batch. Any new threads arriving become new qthreads.
1583  *
1584  * Only 2 condition variables are needed and there's no transition
1585  * between the two cvs needed. They just flip-flop between qthreads
1586  * and cthreads.
1587  *
1588  * Using this scheme we can efficiently wakeup up only those threads
1589  * that have been committed.
1590  */
1591 void
1592 zil_commit(zilog_t *zilog, uint64_t foid)
1593 {
1594 	uint64_t mybatch;
1595 
1596 	if (zilog->zl_sync == ZFS_SYNC_DISABLED)
1597 		return;
1598 
1599 	/* move the async itxs for the foid to the sync queues */
1600 	zil_async_to_sync(zilog, foid);
1601 
1602 	mutex_enter(&zilog->zl_lock);
1603 	mybatch = zilog->zl_next_batch;
1604 	while (zilog->zl_writer) {
1605 		cv_wait(&zilog->zl_cv_batch[mybatch & 1], &zilog->zl_lock);
1606 		if (mybatch <= zilog->zl_com_batch) {
1607 			mutex_exit(&zilog->zl_lock);
1608 			return;
1609 		}
1610 	}
1611 
1612 	zilog->zl_next_batch++;
1613 	zilog->zl_writer = B_TRUE;
1614 	zil_commit_writer(zilog);
1615 	zilog->zl_com_batch = mybatch;
1616 	zilog->zl_writer = B_FALSE;
1617 	mutex_exit(&zilog->zl_lock);
1618 
1619 	/* wake up one thread to become the next writer */
1620 	cv_signal(&zilog->zl_cv_batch[(mybatch+1) & 1]);
1621 
1622 	/* wake up all threads waiting for this batch to be committed */
1623 	cv_broadcast(&zilog->zl_cv_batch[mybatch & 1]);
1624 }
1625 
1626 /*
1627  * Called in syncing context to free committed log blocks and update log header.
1628  */
1629 void
1630 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
1631 {
1632 	zil_header_t *zh = zil_header_in_syncing_context(zilog);
1633 	uint64_t txg = dmu_tx_get_txg(tx);
1634 	spa_t *spa = zilog->zl_spa;
1635 	uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
1636 	lwb_t *lwb;
1637 
1638 	/*
1639 	 * We don't zero out zl_destroy_txg, so make sure we don't try
1640 	 * to destroy it twice.
1641 	 */
1642 	if (spa_sync_pass(spa) != 1)
1643 		return;
1644 
1645 	mutex_enter(&zilog->zl_lock);
1646 
1647 	ASSERT(zilog->zl_stop_sync == 0);
1648 
1649 	if (*replayed_seq != 0) {
1650 		ASSERT(zh->zh_replay_seq < *replayed_seq);
1651 		zh->zh_replay_seq = *replayed_seq;
1652 		*replayed_seq = 0;
1653 	}
1654 
1655 	if (zilog->zl_destroy_txg == txg) {
1656 		blkptr_t blk = zh->zh_log;
1657 
1658 		ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
1659 
1660 		bzero(zh, sizeof (zil_header_t));
1661 		bzero(zilog->zl_replayed_seq, sizeof (zilog->zl_replayed_seq));
1662 
1663 		if (zilog->zl_keep_first) {
1664 			/*
1665 			 * If this block was part of log chain that couldn't
1666 			 * be claimed because a device was missing during
1667 			 * zil_claim(), but that device later returns,
1668 			 * then this block could erroneously appear valid.
1669 			 * To guard against this, assign a new GUID to the new
1670 			 * log chain so it doesn't matter what blk points to.
1671 			 */
1672 			zil_init_log_chain(zilog, &blk);
1673 			zh->zh_log = blk;
1674 		}
1675 	}
1676 
1677 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
1678 		zh->zh_log = lwb->lwb_blk;
1679 		if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
1680 			break;
1681 		list_remove(&zilog->zl_lwb_list, lwb);
1682 		zio_free_zil(spa, txg, &lwb->lwb_blk);
1683 		kmem_cache_free(zil_lwb_cache, lwb);
1684 
1685 		/*
1686 		 * If we don't have anything left in the lwb list then
1687 		 * we've had an allocation failure and we need to zero
1688 		 * out the zil_header blkptr so that we don't end
1689 		 * up freeing the same block twice.
1690 		 */
1691 		if (list_head(&zilog->zl_lwb_list) == NULL)
1692 			BP_ZERO(&zh->zh_log);
1693 	}
1694 	mutex_exit(&zilog->zl_lock);
1695 }
1696 
1697 void
1698 zil_init(void)
1699 {
1700 	zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
1701 	    sizeof (struct lwb), 0, NULL, NULL, NULL, NULL, NULL, 0);
1702 }
1703 
1704 void
1705 zil_fini(void)
1706 {
1707 	kmem_cache_destroy(zil_lwb_cache);
1708 }
1709 
1710 void
1711 zil_set_sync(zilog_t *zilog, uint64_t sync)
1712 {
1713 	zilog->zl_sync = sync;
1714 }
1715 
1716 void
1717 zil_set_logbias(zilog_t *zilog, uint64_t logbias)
1718 {
1719 	zilog->zl_logbias = logbias;
1720 }
1721 
1722 zilog_t *
1723 zil_alloc(objset_t *os, zil_header_t *zh_phys)
1724 {
1725 	zilog_t *zilog;
1726 
1727 	zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
1728 
1729 	zilog->zl_header = zh_phys;
1730 	zilog->zl_os = os;
1731 	zilog->zl_spa = dmu_objset_spa(os);
1732 	zilog->zl_dmu_pool = dmu_objset_pool(os);
1733 	zilog->zl_destroy_txg = TXG_INITIAL - 1;
1734 	zilog->zl_logbias = dmu_objset_logbias(os);
1735 	zilog->zl_sync = dmu_objset_syncprop(os);
1736 	zilog->zl_next_batch = 1;
1737 
1738 	mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
1739 
1740 	for (int i = 0; i < TXG_SIZE; i++) {
1741 		mutex_init(&zilog->zl_itxg[i].itxg_lock, NULL,
1742 		    MUTEX_DEFAULT, NULL);
1743 	}
1744 
1745 	list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
1746 	    offsetof(lwb_t, lwb_node));
1747 
1748 	list_create(&zilog->zl_itx_commit_list, sizeof (itx_t),
1749 	    offsetof(itx_t, itx_node));
1750 
1751 	mutex_init(&zilog->zl_vdev_lock, NULL, MUTEX_DEFAULT, NULL);
1752 
1753 	avl_create(&zilog->zl_vdev_tree, zil_vdev_compare,
1754 	    sizeof (zil_vdev_node_t), offsetof(zil_vdev_node_t, zv_node));
1755 
1756 	cv_init(&zilog->zl_cv_writer, NULL, CV_DEFAULT, NULL);
1757 	cv_init(&zilog->zl_cv_suspend, NULL, CV_DEFAULT, NULL);
1758 	cv_init(&zilog->zl_cv_batch[0], NULL, CV_DEFAULT, NULL);
1759 	cv_init(&zilog->zl_cv_batch[1], NULL, CV_DEFAULT, NULL);
1760 
1761 	return (zilog);
1762 }
1763 
1764 void
1765 zil_free(zilog_t *zilog)
1766 {
1767 	zilog->zl_stop_sync = 1;
1768 
1769 	ASSERT0(zilog->zl_suspend);
1770 	ASSERT0(zilog->zl_suspending);
1771 
1772 	ASSERT(list_is_empty(&zilog->zl_lwb_list));
1773 	list_destroy(&zilog->zl_lwb_list);
1774 
1775 	avl_destroy(&zilog->zl_vdev_tree);
1776 	mutex_destroy(&zilog->zl_vdev_lock);
1777 
1778 	ASSERT(list_is_empty(&zilog->zl_itx_commit_list));
1779 	list_destroy(&zilog->zl_itx_commit_list);
1780 
1781 	for (int i = 0; i < TXG_SIZE; i++) {
1782 		/*
1783 		 * It's possible for an itx to be generated that doesn't dirty
1784 		 * a txg (e.g. ztest TX_TRUNCATE). So there's no zil_clean()
1785 		 * callback to remove the entry. We remove those here.
1786 		 *
1787 		 * Also free up the ziltest itxs.
1788 		 */
1789 		if (zilog->zl_itxg[i].itxg_itxs)
1790 			zil_itxg_clean(zilog->zl_itxg[i].itxg_itxs);
1791 		mutex_destroy(&zilog->zl_itxg[i].itxg_lock);
1792 	}
1793 
1794 	mutex_destroy(&zilog->zl_lock);
1795 
1796 	cv_destroy(&zilog->zl_cv_writer);
1797 	cv_destroy(&zilog->zl_cv_suspend);
1798 	cv_destroy(&zilog->zl_cv_batch[0]);
1799 	cv_destroy(&zilog->zl_cv_batch[1]);
1800 
1801 	kmem_free(zilog, sizeof (zilog_t));
1802 }
1803 
1804 /*
1805  * Open an intent log.
1806  */
1807 zilog_t *
1808 zil_open(objset_t *os, zil_get_data_t *get_data)
1809 {
1810 	zilog_t *zilog = dmu_objset_zil(os);
1811 
1812 	ASSERT(zilog->zl_clean_taskq == NULL);
1813 	ASSERT(zilog->zl_get_data == NULL);
1814 	ASSERT(list_is_empty(&zilog->zl_lwb_list));
1815 
1816 	zilog->zl_get_data = get_data;
1817 	zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
1818 	    2, 2, TASKQ_PREPOPULATE);
1819 
1820 	return (zilog);
1821 }
1822 
1823 /*
1824  * Close an intent log.
1825  */
1826 void
1827 zil_close(zilog_t *zilog)
1828 {
1829 	lwb_t *lwb;
1830 	uint64_t txg = 0;
1831 
1832 	zil_commit(zilog, 0); /* commit all itx */
1833 
1834 	/*
1835 	 * The lwb_max_txg for the stubby lwb will reflect the last activity
1836 	 * for the zil.  After a txg_wait_synced() on the txg we know all the
1837 	 * callbacks have occurred that may clean the zil.  Only then can we
1838 	 * destroy the zl_clean_taskq.
1839 	 */
1840 	mutex_enter(&zilog->zl_lock);
1841 	lwb = list_tail(&zilog->zl_lwb_list);
1842 	if (lwb != NULL)
1843 		txg = lwb->lwb_max_txg;
1844 	mutex_exit(&zilog->zl_lock);
1845 	if (txg)
1846 		txg_wait_synced(zilog->zl_dmu_pool, txg);
1847 
1848 	if (zilog_is_dirty(zilog))
1849 		zfs_dbgmsg("zil (%p) is dirty, txg %llu", zilog, txg);
1850 	VERIFY(!zilog_is_dirty(zilog));
1851 
1852 	taskq_destroy(zilog->zl_clean_taskq);
1853 	zilog->zl_clean_taskq = NULL;
1854 	zilog->zl_get_data = NULL;
1855 
1856 	/*
1857 	 * We should have only one LWB left on the list; remove it now.
1858 	 */
1859 	mutex_enter(&zilog->zl_lock);
1860 	lwb = list_head(&zilog->zl_lwb_list);
1861 	if (lwb != NULL) {
1862 		ASSERT(lwb == list_tail(&zilog->zl_lwb_list));
1863 		list_remove(&zilog->zl_lwb_list, lwb);
1864 		zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
1865 		kmem_cache_free(zil_lwb_cache, lwb);
1866 	}
1867 	mutex_exit(&zilog->zl_lock);
1868 }
1869 
1870 static char *suspend_tag = "zil suspending";
1871 
1872 /*
1873  * Suspend an intent log.  While in suspended mode, we still honor
1874  * synchronous semantics, but we rely on txg_wait_synced() to do it.
1875  * On old version pools, we suspend the log briefly when taking a
1876  * snapshot so that it will have an empty intent log.
1877  *
1878  * Long holds are not really intended to be used the way we do here --
1879  * held for such a short time.  A concurrent caller of dsl_dataset_long_held()
1880  * could fail.  Therefore we take pains to only put a long hold if it is
1881  * actually necessary.  Fortunately, it will only be necessary if the
1882  * objset is currently mounted (or the ZVOL equivalent).  In that case it
1883  * will already have a long hold, so we are not really making things any worse.
1884  *
1885  * Ideally, we would locate the existing long-holder (i.e. the zfsvfs_t or
1886  * zvol_state_t), and use their mechanism to prevent their hold from being
1887  * dropped (e.g. VFS_HOLD()).  However, that would be even more pain for
1888  * very little gain.
1889  *
1890  * if cookiep == NULL, this does both the suspend & resume.
1891  * Otherwise, it returns with the dataset "long held", and the cookie
1892  * should be passed into zil_resume().
1893  */
1894 int
1895 zil_suspend(const char *osname, void **cookiep)
1896 {
1897 	objset_t *os;
1898 	zilog_t *zilog;
1899 	const zil_header_t *zh;
1900 	int error;
1901 
1902 	error = dmu_objset_hold(osname, suspend_tag, &os);
1903 	if (error != 0)
1904 		return (error);
1905 	zilog = dmu_objset_zil(os);
1906 
1907 	mutex_enter(&zilog->zl_lock);
1908 	zh = zilog->zl_header;
1909 
1910 	if (zh->zh_flags & ZIL_REPLAY_NEEDED) {		/* unplayed log */
1911 		mutex_exit(&zilog->zl_lock);
1912 		dmu_objset_rele(os, suspend_tag);
1913 		return (SET_ERROR(EBUSY));
1914 	}
1915 
1916 	/*
1917 	 * Don't put a long hold in the cases where we can avoid it.  This
1918 	 * is when there is no cookie so we are doing a suspend & resume
1919 	 * (i.e. called from zil_vdev_offline()), and there's nothing to do
1920 	 * for the suspend because it's already suspended, or there's no ZIL.
1921 	 */
1922 	if (cookiep == NULL && !zilog->zl_suspending &&
1923 	    (zilog->zl_suspend > 0 || BP_IS_HOLE(&zh->zh_log))) {
1924 		mutex_exit(&zilog->zl_lock);
1925 		dmu_objset_rele(os, suspend_tag);
1926 		return (0);
1927 	}
1928 
1929 	dsl_dataset_long_hold(dmu_objset_ds(os), suspend_tag);
1930 	dsl_pool_rele(dmu_objset_pool(os), suspend_tag);
1931 
1932 	zilog->zl_suspend++;
1933 
1934 	if (zilog->zl_suspend > 1) {
1935 		/*
1936 		 * Someone else is already suspending it.
1937 		 * Just wait for them to finish.
1938 		 */
1939 
1940 		while (zilog->zl_suspending)
1941 			cv_wait(&zilog->zl_cv_suspend, &zilog->zl_lock);
1942 		mutex_exit(&zilog->zl_lock);
1943 
1944 		if (cookiep == NULL)
1945 			zil_resume(os);
1946 		else
1947 			*cookiep = os;
1948 		return (0);
1949 	}
1950 
1951 	/*
1952 	 * If there is no pointer to an on-disk block, this ZIL must not
1953 	 * be active (e.g. filesystem not mounted), so there's nothing
1954 	 * to clean up.
1955 	 */
1956 	if (BP_IS_HOLE(&zh->zh_log)) {
1957 		ASSERT(cookiep != NULL); /* fast path already handled */
1958 
1959 		*cookiep = os;
1960 		mutex_exit(&zilog->zl_lock);
1961 		return (0);
1962 	}
1963 
1964 	zilog->zl_suspending = B_TRUE;
1965 	mutex_exit(&zilog->zl_lock);
1966 
1967 	zil_commit(zilog, 0);
1968 
1969 	zil_destroy(zilog, B_FALSE);
1970 
1971 	mutex_enter(&zilog->zl_lock);
1972 	zilog->zl_suspending = B_FALSE;
1973 	cv_broadcast(&zilog->zl_cv_suspend);
1974 	mutex_exit(&zilog->zl_lock);
1975 
1976 	if (cookiep == NULL)
1977 		zil_resume(os);
1978 	else
1979 		*cookiep = os;
1980 	return (0);
1981 }
1982 
1983 void
1984 zil_resume(void *cookie)
1985 {
1986 	objset_t *os = cookie;
1987 	zilog_t *zilog = dmu_objset_zil(os);
1988 
1989 	mutex_enter(&zilog->zl_lock);
1990 	ASSERT(zilog->zl_suspend != 0);
1991 	zilog->zl_suspend--;
1992 	mutex_exit(&zilog->zl_lock);
1993 	dsl_dataset_long_rele(dmu_objset_ds(os), suspend_tag);
1994 	dsl_dataset_rele(dmu_objset_ds(os), suspend_tag);
1995 }
1996 
1997 typedef struct zil_replay_arg {
1998 	zil_replay_func_t **zr_replay;
1999 	void		*zr_arg;
2000 	boolean_t	zr_byteswap;
2001 	char		*zr_lr;
2002 } zil_replay_arg_t;
2003 
2004 static int
2005 zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
2006 {
2007 	char name[ZFS_MAX_DATASET_NAME_LEN];
2008 
2009 	zilog->zl_replaying_seq--;	/* didn't actually replay this one */
2010 
2011 	dmu_objset_name(zilog->zl_os, name);
2012 
2013 	cmn_err(CE_WARN, "ZFS replay transaction error %d, "
2014 	    "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
2015 	    (u_longlong_t)lr->lrc_seq,
2016 	    (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
2017 	    (lr->lrc_txtype & TX_CI) ? "CI" : "");
2018 
2019 	return (error);
2020 }
2021 
2022 static int
2023 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
2024 {
2025 	zil_replay_arg_t *zr = zra;
2026 	const zil_header_t *zh = zilog->zl_header;
2027 	uint64_t reclen = lr->lrc_reclen;
2028 	uint64_t txtype = lr->lrc_txtype;
2029 	int error = 0;
2030 
2031 	zilog->zl_replaying_seq = lr->lrc_seq;
2032 
2033 	if (lr->lrc_seq <= zh->zh_replay_seq)	/* already replayed */
2034 		return (0);
2035 
2036 	if (lr->lrc_txg < claim_txg)		/* already committed */
2037 		return (0);
2038 
2039 	/* Strip case-insensitive bit, still present in log record */
2040 	txtype &= ~TX_CI;
2041 
2042 	if (txtype == 0 || txtype >= TX_MAX_TYPE)
2043 		return (zil_replay_error(zilog, lr, EINVAL));
2044 
2045 	/*
2046 	 * If this record type can be logged out of order, the object
2047 	 * (lr_foid) may no longer exist.  That's legitimate, not an error.
2048 	 */
2049 	if (TX_OOO(txtype)) {
2050 		error = dmu_object_info(zilog->zl_os,
2051 		    ((lr_ooo_t *)lr)->lr_foid, NULL);
2052 		if (error == ENOENT || error == EEXIST)
2053 			return (0);
2054 	}
2055 
2056 	/*
2057 	 * Make a copy of the data so we can revise and extend it.
2058 	 */
2059 	bcopy(lr, zr->zr_lr, reclen);
2060 
2061 	/*
2062 	 * If this is a TX_WRITE with a blkptr, suck in the data.
2063 	 */
2064 	if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
2065 		error = zil_read_log_data(zilog, (lr_write_t *)lr,
2066 		    zr->zr_lr + reclen);
2067 		if (error != 0)
2068 			return (zil_replay_error(zilog, lr, error));
2069 	}
2070 
2071 	/*
2072 	 * The log block containing this lr may have been byteswapped
2073 	 * so that we can easily examine common fields like lrc_txtype.
2074 	 * However, the log is a mix of different record types, and only the
2075 	 * replay vectors know how to byteswap their records.  Therefore, if
2076 	 * the lr was byteswapped, undo it before invoking the replay vector.
2077 	 */
2078 	if (zr->zr_byteswap)
2079 		byteswap_uint64_array(zr->zr_lr, reclen);
2080 
2081 	/*
2082 	 * We must now do two things atomically: replay this log record,
2083 	 * and update the log header sequence number to reflect the fact that
2084 	 * we did so. At the end of each replay function the sequence number
2085 	 * is updated if we are in replay mode.
2086 	 */
2087 	error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
2088 	if (error != 0) {
2089 		/*
2090 		 * The DMU's dnode layer doesn't see removes until the txg
2091 		 * commits, so a subsequent claim can spuriously fail with
2092 		 * EEXIST. So if we receive any error we try syncing out
2093 		 * any removes then retry the transaction.  Note that we
2094 		 * specify B_FALSE for byteswap now, so we don't do it twice.
2095 		 */
2096 		txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
2097 		error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
2098 		if (error != 0)
2099 			return (zil_replay_error(zilog, lr, error));
2100 	}
2101 	return (0);
2102 }
2103 
2104 /* ARGSUSED */
2105 static int
2106 zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
2107 {
2108 	zilog->zl_replay_blks++;
2109 
2110 	return (0);
2111 }
2112 
2113 /*
2114  * If this dataset has a non-empty intent log, replay it and destroy it.
2115  */
2116 void
2117 zil_replay(objset_t *os, void *arg, zil_replay_func_t *replay_func[TX_MAX_TYPE])
2118 {
2119 	zilog_t *zilog = dmu_objset_zil(os);
2120 	const zil_header_t *zh = zilog->zl_header;
2121 	zil_replay_arg_t zr;
2122 
2123 	if ((zh->zh_flags & ZIL_REPLAY_NEEDED) == 0) {
2124 		zil_destroy(zilog, B_TRUE);
2125 		return;
2126 	}
2127 
2128 	zr.zr_replay = replay_func;
2129 	zr.zr_arg = arg;
2130 	zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
2131 	zr.zr_lr = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
2132 
2133 	/*
2134 	 * Wait for in-progress removes to sync before starting replay.
2135 	 */
2136 	txg_wait_synced(zilog->zl_dmu_pool, 0);
2137 
2138 	zilog->zl_replay = B_TRUE;
2139 	zilog->zl_replay_time = ddi_get_lbolt();
2140 	ASSERT(zilog->zl_replay_blks == 0);
2141 	(void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
2142 	    zh->zh_claim_txg);
2143 	kmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
2144 
2145 	zil_destroy(zilog, B_FALSE);
2146 	txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
2147 	zilog->zl_replay = B_FALSE;
2148 }
2149 
2150 boolean_t
2151 zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
2152 {
2153 	if (zilog->zl_sync == ZFS_SYNC_DISABLED)
2154 		return (B_TRUE);
2155 
2156 	if (zilog->zl_replay) {
2157 		dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
2158 		zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
2159 		    zilog->zl_replaying_seq;
2160 		return (B_TRUE);
2161 	}
2162 
2163 	return (B_FALSE);
2164 }
2165 
2166 /* ARGSUSED */
2167 int
2168 zil_vdev_offline(const char *osname, void *arg)
2169 {
2170 	int error;
2171 
2172 	error = zil_suspend(osname, NULL);
2173 	if (error != 0)
2174 		return (SET_ERROR(EEXIST));
2175 	return (0);
2176 }
2177