1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Portions Copyright 2011 Martin Matuska
24 * Copyright (c) 2013 by Delphix. All rights reserved.
25 */
26
27#include <sys/zfs_context.h>
28#include <sys/txg_impl.h>
29#include <sys/dmu_impl.h>
30#include <sys/dmu_tx.h>
31#include <sys/dsl_pool.h>
32#include <sys/dsl_scan.h>
33#include <sys/callb.h>
34
35/*
36 * ZFS Transaction Groups
37 * ----------------------
38 *
39 * ZFS transaction groups are, as the name implies, groups of transactions
40 * that act on persistent state. ZFS asserts consistency at the granularity of
41 * these transaction groups. Each successive transaction group (txg) is
42 * assigned a 64-bit consecutive identifier. There are three active
43 * transaction group states: open, quiescing, or syncing. At any given time,
44 * there may be an active txg associated with each state; each active txg may
45 * either be processing, or blocked waiting to enter the next state. There may
46 * be up to three active txgs, and there is always a txg in the open state
47 * (though it may be blocked waiting to enter the quiescing state). In broad
48 * strokes, transactions -- operations that change in-memory structures -- are
49 * accepted into the txg in the open state, and are completed while the txg is
50 * in the open or quiescing states. The accumulated changes are written to
51 * disk in the syncing state.
52 *
53 * Open
54 *
55 * When a new txg becomes active, it first enters the open state. New
56 * transactions -- updates to in-memory structures -- are assigned to the
57 * currently open txg. There is always a txg in the open state so that ZFS can
58 * accept new changes (though the txg may refuse new changes if it has hit
59 * some limit). ZFS advances the open txg to the next state for a variety of
60 * reasons such as it hitting a time or size threshold, or the execution of an
61 * administrative action that must be completed in the syncing state.
62 *
63 * Quiescing
64 *
65 * After a txg exits the open state, it enters the quiescing state. The
66 * quiescing state is intended to provide a buffer between accepting new
67 * transactions in the open state and writing them out to stable storage in
68 * the syncing state. While quiescing, transactions can continue their
69 * operation without delaying either of the other states. Typically, a txg is
70 * in the quiescing state very briefly since the operations are bounded by
71 * software latencies rather than, say, slower I/O latencies. After all
72 * transactions complete, the txg is ready to enter the next state.
73 *
74 * Syncing
75 *
76 * In the syncing state, the in-memory state built up during the open and (to
77 * a lesser degree) the quiescing states is written to stable storage. The
78 * process of writing out modified data can, in turn modify more data. For
79 * example when we write new blocks, we need to allocate space for them; those
80 * allocations modify metadata (space maps)... which themselves must be
81 * written to stable storage. During the sync state, ZFS iterates, writing out
82 * data until it converges and all in-memory changes have been written out.
83 * The first such pass is the largest as it encompasses all the modified user
84 * data (as opposed to filesystem metadata). Subsequent passes typically have
85 * far less data to write as they consist exclusively of filesystem metadata.
86 *
87 * To ensure convergence, after a certain number of passes ZFS begins
88 * overwriting locations on stable storage that had been allocated earlier in
89 * the syncing state (and subsequently freed). ZFS usually allocates new
90 * blocks to optimize for large, continuous, writes. For the syncing state to
91 * converge however it must complete a pass where no new blocks are allocated
92 * since each allocation requires a modification of persistent metadata.
93 * Further, to hasten convergence, after a prescribed number of passes, ZFS
94 * also defers frees, and stops compressing.
95 *
96 * In addition to writing out user data, we must also execute synctasks during
97 * the syncing context. A synctask is the mechanism by which some
98 * administrative activities work such as creating and destroying snapshots or
99 * datasets. Note that when a synctask is initiated it enters the open txg,
100 * and ZFS then pushes that txg as quickly as possible to completion of the
101 * syncing state in order to reduce the latency of the administrative
102 * activity. To complete the syncing state, ZFS writes out a new uberblock,
103 * the root of the tree of blocks that comprise all state stored on the ZFS
104 * pool. Finally, if there is a quiesced txg waiting, we signal that it can
105 * now transition to the syncing state.
106 */
107
108static void txg_sync_thread(dsl_pool_t *dp);
109static void txg_quiesce_thread(dsl_pool_t *dp);
110
111int zfs_txg_timeout = 5;	/* max seconds worth of delta per txg */
112
113/*
114 * Prepare the txg subsystem.
115 */
116void
117txg_init(dsl_pool_t *dp, uint64_t txg)
118{
119	tx_state_t *tx = &dp->dp_tx;
120	int c;
121	bzero(tx, sizeof (tx_state_t));
122
123	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
124
125	for (c = 0; c < max_ncpus; c++) {
126		int i;
127
128		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
129		mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT,
130		    NULL);
131		for (i = 0; i < TXG_SIZE; i++) {
132			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
133			    NULL);
134			list_create(&tx->tx_cpu[c].tc_callbacks[i],
135			    sizeof (dmu_tx_callback_t),
136			    offsetof(dmu_tx_callback_t, dcb_node));
137		}
138	}
139
140	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
141
142	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
143	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
144	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
145	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
146	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
147
148	tx->tx_open_txg = txg;
149}
150
151/*
152 * Close down the txg subsystem.
153 */
154void
155txg_fini(dsl_pool_t *dp)
156{
157	tx_state_t *tx = &dp->dp_tx;
158	int c;
159
160	ASSERT(tx->tx_threads == 0);
161
162	mutex_destroy(&tx->tx_sync_lock);
163
164	cv_destroy(&tx->tx_sync_more_cv);
165	cv_destroy(&tx->tx_sync_done_cv);
166	cv_destroy(&tx->tx_quiesce_more_cv);
167	cv_destroy(&tx->tx_quiesce_done_cv);
168	cv_destroy(&tx->tx_exit_cv);
169
170	for (c = 0; c < max_ncpus; c++) {
171		int i;
172
173		mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
174		mutex_destroy(&tx->tx_cpu[c].tc_lock);
175		for (i = 0; i < TXG_SIZE; i++) {
176			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
177			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
178		}
179	}
180
181	if (tx->tx_commit_cb_taskq != NULL)
182		taskq_destroy(tx->tx_commit_cb_taskq);
183
184	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
185
186	bzero(tx, sizeof (tx_state_t));
187}
188
189/*
190 * Start syncing transaction groups.
191 */
192void
193txg_sync_start(dsl_pool_t *dp)
194{
195	tx_state_t *tx = &dp->dp_tx;
196
197	mutex_enter(&tx->tx_sync_lock);
198
199	dprintf("pool %p\n", dp);
200
201	ASSERT(tx->tx_threads == 0);
202
203	tx->tx_threads = 2;
204
205	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
206	    dp, 0, &p0, TS_RUN, minclsyspri);
207
208	/*
209	 * The sync thread can need a larger-than-default stack size on
210	 * 32-bit x86.  This is due in part to nested pools and
211	 * scrub_visitbp() recursion.
212	 */
213	tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
214	    dp, 0, &p0, TS_RUN, minclsyspri);
215
216	mutex_exit(&tx->tx_sync_lock);
217}
218
219static void
220txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
221{
222	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
223	mutex_enter(&tx->tx_sync_lock);
224}
225
226static void
227txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
228{
229	ASSERT(*tpp != NULL);
230	*tpp = NULL;
231	tx->tx_threads--;
232	cv_broadcast(&tx->tx_exit_cv);
233	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
234	thread_exit();
235}
236
237static void
238txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
239{
240	CALLB_CPR_SAFE_BEGIN(cpr);
241
242	if (time)
243		(void) cv_timedwait(cv, &tx->tx_sync_lock,
244		    ddi_get_lbolt() + time);
245	else
246		cv_wait(cv, &tx->tx_sync_lock);
247
248	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
249}
250
251/*
252 * Stop syncing transaction groups.
253 */
254void
255txg_sync_stop(dsl_pool_t *dp)
256{
257	tx_state_t *tx = &dp->dp_tx;
258
259	dprintf("pool %p\n", dp);
260	/*
261	 * Finish off any work in progress.
262	 */
263	ASSERT(tx->tx_threads == 2);
264
265	/*
266	 * We need to ensure that we've vacated the deferred space_maps.
267	 */
268	txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
269
270	/*
271	 * Wake all sync threads and wait for them to die.
272	 */
273	mutex_enter(&tx->tx_sync_lock);
274
275	ASSERT(tx->tx_threads == 2);
276
277	tx->tx_exiting = 1;
278
279	cv_broadcast(&tx->tx_quiesce_more_cv);
280	cv_broadcast(&tx->tx_quiesce_done_cv);
281	cv_broadcast(&tx->tx_sync_more_cv);
282
283	while (tx->tx_threads != 0)
284		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
285
286	tx->tx_exiting = 0;
287
288	mutex_exit(&tx->tx_sync_lock);
289}
290
291uint64_t
292txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
293{
294	tx_state_t *tx = &dp->dp_tx;
295	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
296	uint64_t txg;
297
298	mutex_enter(&tc->tc_open_lock);
299	txg = tx->tx_open_txg;
300
301	mutex_enter(&tc->tc_lock);
302	tc->tc_count[txg & TXG_MASK]++;
303	mutex_exit(&tc->tc_lock);
304
305	th->th_cpu = tc;
306	th->th_txg = txg;
307
308	return (txg);
309}
310
311void
312txg_rele_to_quiesce(txg_handle_t *th)
313{
314	tx_cpu_t *tc = th->th_cpu;
315
316	ASSERT(!MUTEX_HELD(&tc->tc_lock));
317	mutex_exit(&tc->tc_open_lock);
318}
319
320void
321txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
322{
323	tx_cpu_t *tc = th->th_cpu;
324	int g = th->th_txg & TXG_MASK;
325
326	mutex_enter(&tc->tc_lock);
327	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
328	mutex_exit(&tc->tc_lock);
329}
330
331void
332txg_rele_to_sync(txg_handle_t *th)
333{
334	tx_cpu_t *tc = th->th_cpu;
335	int g = th->th_txg & TXG_MASK;
336
337	mutex_enter(&tc->tc_lock);
338	ASSERT(tc->tc_count[g] != 0);
339	if (--tc->tc_count[g] == 0)
340		cv_broadcast(&tc->tc_cv[g]);
341	mutex_exit(&tc->tc_lock);
342
343	th->th_cpu = NULL;	/* defensive */
344}
345
346/*
347 * Blocks until all transactions in the group are committed.
348 *
349 * On return, the transaction group has reached a stable state in which it can
350 * then be passed off to the syncing context.
351 */
352static void
353txg_quiesce(dsl_pool_t *dp, uint64_t txg)
354{
355	tx_state_t *tx = &dp->dp_tx;
356	int g = txg & TXG_MASK;
357	int c;
358
359	/*
360	 * Grab all tc_open_locks so nobody else can get into this txg.
361	 */
362	for (c = 0; c < max_ncpus; c++)
363		mutex_enter(&tx->tx_cpu[c].tc_open_lock);
364
365	ASSERT(txg == tx->tx_open_txg);
366	tx->tx_open_txg++;
367	tx->tx_open_time = gethrtime();
368
369	DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
370	DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
371
372	/*
373	 * Now that we've incremented tx_open_txg, we can let threads
374	 * enter the next transaction group.
375	 */
376	for (c = 0; c < max_ncpus; c++)
377		mutex_exit(&tx->tx_cpu[c].tc_open_lock);
378
379	/*
380	 * Quiesce the transaction group by waiting for everyone to txg_exit().
381	 */
382	for (c = 0; c < max_ncpus; c++) {
383		tx_cpu_t *tc = &tx->tx_cpu[c];
384		mutex_enter(&tc->tc_lock);
385		while (tc->tc_count[g] != 0)
386			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
387		mutex_exit(&tc->tc_lock);
388	}
389}
390
391static void
392txg_do_callbacks(list_t *cb_list)
393{
394	dmu_tx_do_callbacks(cb_list, 0);
395
396	list_destroy(cb_list);
397
398	kmem_free(cb_list, sizeof (list_t));
399}
400
401/*
402 * Dispatch the commit callbacks registered on this txg to worker threads.
403 *
404 * If no callbacks are registered for a given TXG, nothing happens.
405 * This function creates a taskq for the associated pool, if needed.
406 */
407static void
408txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
409{
410	int c;
411	tx_state_t *tx = &dp->dp_tx;
412	list_t *cb_list;
413
414	for (c = 0; c < max_ncpus; c++) {
415		tx_cpu_t *tc = &tx->tx_cpu[c];
416		/*
417		 * No need to lock tx_cpu_t at this point, since this can
418		 * only be called once a txg has been synced.
419		 */
420
421		int g = txg & TXG_MASK;
422
423		if (list_is_empty(&tc->tc_callbacks[g]))
424			continue;
425
426		if (tx->tx_commit_cb_taskq == NULL) {
427			/*
428			 * Commit callback taskq hasn't been created yet.
429			 */
430			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
431			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
432			    TASKQ_PREPOPULATE);
433		}
434
435		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
436		list_create(cb_list, sizeof (dmu_tx_callback_t),
437		    offsetof(dmu_tx_callback_t, dcb_node));
438
439		list_move_tail(cb_list, &tc->tc_callbacks[g]);
440
441		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
442		    txg_do_callbacks, cb_list, TQ_SLEEP);
443	}
444}
445
446static void
447txg_sync_thread(dsl_pool_t *dp)
448{
449	spa_t *spa = dp->dp_spa;
450	tx_state_t *tx = &dp->dp_tx;
451	callb_cpr_t cpr;
452	uint64_t start, delta;
453
454	txg_thread_enter(tx, &cpr);
455
456	start = delta = 0;
457	for (;;) {
458		uint64_t timeout = zfs_txg_timeout * hz;
459		uint64_t timer;
460		uint64_t txg;
461
462		/*
463		 * We sync when we're scanning, there's someone waiting
464		 * on us, or the quiesce thread has handed off a txg to
465		 * us, or we have reached our timeout.
466		 */
467		timer = (delta >= timeout ? 0 : timeout - delta);
468		while (!dsl_scan_active(dp->dp_scan) &&
469		    !tx->tx_exiting && timer > 0 &&
470		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
471		    tx->tx_quiesced_txg == 0 &&
472		    dp->dp_dirty_total < zfs_dirty_data_sync) {
473			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
474			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
475			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
476			delta = ddi_get_lbolt() - start;
477			timer = (delta > timeout ? 0 : timeout - delta);
478		}
479
480		/*
481		 * Wait until the quiesce thread hands off a txg to us,
482		 * prompting it to do so if necessary.
483		 */
484		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
485			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
486				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
487			cv_broadcast(&tx->tx_quiesce_more_cv);
488			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
489		}
490
491		if (tx->tx_exiting)
492			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
493
494		/*
495		 * Consume the quiesced txg which has been handed off to
496		 * us.  This may cause the quiescing thread to now be
497		 * able to quiesce another txg, so we must signal it.
498		 */
499		txg = tx->tx_quiesced_txg;
500		tx->tx_quiesced_txg = 0;
501		tx->tx_syncing_txg = txg;
502		DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
503		cv_broadcast(&tx->tx_quiesce_more_cv);
504
505		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
506		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
507		mutex_exit(&tx->tx_sync_lock);
508
509		start = ddi_get_lbolt();
510		spa_sync(spa, txg);
511		delta = ddi_get_lbolt() - start;
512
513		mutex_enter(&tx->tx_sync_lock);
514		tx->tx_synced_txg = txg;
515		tx->tx_syncing_txg = 0;
516		DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
517		cv_broadcast(&tx->tx_sync_done_cv);
518
519		/*
520		 * Dispatch commit callbacks to worker threads.
521		 */
522		txg_dispatch_callbacks(dp, txg);
523	}
524}
525
526static void
527txg_quiesce_thread(dsl_pool_t *dp)
528{
529	tx_state_t *tx = &dp->dp_tx;
530	callb_cpr_t cpr;
531
532	txg_thread_enter(tx, &cpr);
533
534	for (;;) {
535		uint64_t txg;
536
537		/*
538		 * We quiesce when there's someone waiting on us.
539		 * However, we can only have one txg in "quiescing" or
540		 * "quiesced, waiting to sync" state.  So we wait until
541		 * the "quiesced, waiting to sync" txg has been consumed
542		 * by the sync thread.
543		 */
544		while (!tx->tx_exiting &&
545		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
546		    tx->tx_quiesced_txg != 0))
547			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
548
549		if (tx->tx_exiting)
550			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
551
552		txg = tx->tx_open_txg;
553		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
554		    txg, tx->tx_quiesce_txg_waiting,
555		    tx->tx_sync_txg_waiting);
556		mutex_exit(&tx->tx_sync_lock);
557		txg_quiesce(dp, txg);
558		mutex_enter(&tx->tx_sync_lock);
559
560		/*
561		 * Hand this txg off to the sync thread.
562		 */
563		dprintf("quiesce done, handing off txg %llu\n", txg);
564		tx->tx_quiesced_txg = txg;
565		DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
566		cv_broadcast(&tx->tx_sync_more_cv);
567		cv_broadcast(&tx->tx_quiesce_done_cv);
568	}
569}
570
571/*
572 * Delay this thread by delay nanoseconds if we are still in the open
573 * transaction group and there is already a waiting txg quiescing or quiesced.
574 * Abort the delay if this txg stalls or enters the quiescing state.
575 */
576void
577txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
578{
579	tx_state_t *tx = &dp->dp_tx;
580	hrtime_t start = gethrtime();
581
582	/* don't delay if this txg could transition to quiescing immediately */
583	if (tx->tx_open_txg > txg ||
584	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
585		return;
586
587	mutex_enter(&tx->tx_sync_lock);
588	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
589		mutex_exit(&tx->tx_sync_lock);
590		return;
591	}
592
593	while (gethrtime() - start < delay &&
594	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
595		(void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
596		    &tx->tx_sync_lock, delay, resolution, 0);
597	}
598
599	mutex_exit(&tx->tx_sync_lock);
600}
601
602void
603txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
604{
605	tx_state_t *tx = &dp->dp_tx;
606
607	ASSERT(!dsl_pool_config_held(dp));
608
609	mutex_enter(&tx->tx_sync_lock);
610	ASSERT(tx->tx_threads == 2);
611	if (txg == 0)
612		txg = tx->tx_open_txg + TXG_DEFER_SIZE;
613	if (tx->tx_sync_txg_waiting < txg)
614		tx->tx_sync_txg_waiting = txg;
615	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
616	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
617	while (tx->tx_synced_txg < txg) {
618		dprintf("broadcasting sync more "
619		    "tx_synced=%llu waiting=%llu dp=%p\n",
620		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
621		cv_broadcast(&tx->tx_sync_more_cv);
622		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
623	}
624	mutex_exit(&tx->tx_sync_lock);
625}
626
627void
628txg_wait_open(dsl_pool_t *dp, uint64_t txg)
629{
630	tx_state_t *tx = &dp->dp_tx;
631
632	ASSERT(!dsl_pool_config_held(dp));
633
634	mutex_enter(&tx->tx_sync_lock);
635	ASSERT(tx->tx_threads == 2);
636	if (txg == 0)
637		txg = tx->tx_open_txg + 1;
638	if (tx->tx_quiesce_txg_waiting < txg)
639		tx->tx_quiesce_txg_waiting = txg;
640	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
641	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
642	while (tx->tx_open_txg < txg) {
643		cv_broadcast(&tx->tx_quiesce_more_cv);
644		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
645	}
646	mutex_exit(&tx->tx_sync_lock);
647}
648
649/*
650 * If there isn't a txg syncing or in the pipeline, push another txg through
651 * the pipeline by queiscing the open txg.
652 */
653void
654txg_kick(dsl_pool_t *dp)
655{
656	tx_state_t *tx = &dp->dp_tx;
657
658	ASSERT(!dsl_pool_config_held(dp));
659
660	mutex_enter(&tx->tx_sync_lock);
661	if (tx->tx_syncing_txg == 0 &&
662	    tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
663	    tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
664	    tx->tx_quiesced_txg <= tx->tx_synced_txg) {
665		tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
666		cv_broadcast(&tx->tx_quiesce_more_cv);
667	}
668	mutex_exit(&tx->tx_sync_lock);
669}
670
671boolean_t
672txg_stalled(dsl_pool_t *dp)
673{
674	tx_state_t *tx = &dp->dp_tx;
675	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
676}
677
678boolean_t
679txg_sync_waiting(dsl_pool_t *dp)
680{
681	tx_state_t *tx = &dp->dp_tx;
682
683	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
684	    tx->tx_quiesced_txg != 0);
685}
686
687/*
688 * Per-txg object lists.
689 */
690void
691txg_list_create(txg_list_t *tl, size_t offset)
692{
693	int t;
694
695	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
696
697	tl->tl_offset = offset;
698
699	for (t = 0; t < TXG_SIZE; t++)
700		tl->tl_head[t] = NULL;
701}
702
703void
704txg_list_destroy(txg_list_t *tl)
705{
706	int t;
707
708	for (t = 0; t < TXG_SIZE; t++)
709		ASSERT(txg_list_empty(tl, t));
710
711	mutex_destroy(&tl->tl_lock);
712}
713
714boolean_t
715txg_list_empty(txg_list_t *tl, uint64_t txg)
716{
717	return (tl->tl_head[txg & TXG_MASK] == NULL);
718}
719
720/*
721 * Add an entry to the list (unless it's already on the list).
722 * Returns B_TRUE if it was actually added.
723 */
724boolean_t
725txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
726{
727	int t = txg & TXG_MASK;
728	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
729	boolean_t add;
730
731	mutex_enter(&tl->tl_lock);
732	add = (tn->tn_member[t] == 0);
733	if (add) {
734		tn->tn_member[t] = 1;
735		tn->tn_next[t] = tl->tl_head[t];
736		tl->tl_head[t] = tn;
737	}
738	mutex_exit(&tl->tl_lock);
739
740	return (add);
741}
742
743/*
744 * Add an entry to the end of the list, unless it's already on the list.
745 * (walks list to find end)
746 * Returns B_TRUE if it was actually added.
747 */
748boolean_t
749txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
750{
751	int t = txg & TXG_MASK;
752	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
753	boolean_t add;
754
755	mutex_enter(&tl->tl_lock);
756	add = (tn->tn_member[t] == 0);
757	if (add) {
758		txg_node_t **tp;
759
760		for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
761			continue;
762
763		tn->tn_member[t] = 1;
764		tn->tn_next[t] = NULL;
765		*tp = tn;
766	}
767	mutex_exit(&tl->tl_lock);
768
769	return (add);
770}
771
772/*
773 * Remove the head of the list and return it.
774 */
775void *
776txg_list_remove(txg_list_t *tl, uint64_t txg)
777{
778	int t = txg & TXG_MASK;
779	txg_node_t *tn;
780	void *p = NULL;
781
782	mutex_enter(&tl->tl_lock);
783	if ((tn = tl->tl_head[t]) != NULL) {
784		p = (char *)tn - tl->tl_offset;
785		tl->tl_head[t] = tn->tn_next[t];
786		tn->tn_next[t] = NULL;
787		tn->tn_member[t] = 0;
788	}
789	mutex_exit(&tl->tl_lock);
790
791	return (p);
792}
793
794/*
795 * Remove a specific item from the list and return it.
796 */
797void *
798txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
799{
800	int t = txg & TXG_MASK;
801	txg_node_t *tn, **tp;
802
803	mutex_enter(&tl->tl_lock);
804
805	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
806		if ((char *)tn - tl->tl_offset == p) {
807			*tp = tn->tn_next[t];
808			tn->tn_next[t] = NULL;
809			tn->tn_member[t] = 0;
810			mutex_exit(&tl->tl_lock);
811			return (p);
812		}
813	}
814
815	mutex_exit(&tl->tl_lock);
816
817	return (NULL);
818}
819
820boolean_t
821txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
822{
823	int t = txg & TXG_MASK;
824	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
825
826	return (tn->tn_member[t] != 0);
827}
828
829/*
830 * Walk a txg list -- only safe if you know it's not changing.
831 */
832void *
833txg_list_head(txg_list_t *tl, uint64_t txg)
834{
835	int t = txg & TXG_MASK;
836	txg_node_t *tn = tl->tl_head[t];
837
838	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
839}
840
841void *
842txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
843{
844	int t = txg & TXG_MASK;
845	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
846
847	tn = tn->tn_next[t];
848
849	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
850}
851