xref: /illumos-gate/usr/src/uts/common/fs/zfs/txg.c (revision fa9e4066f08beec538e775443c5be79dd423fcab)
1*fa9e4066Sahrens /*
2*fa9e4066Sahrens  * CDDL HEADER START
3*fa9e4066Sahrens  *
4*fa9e4066Sahrens  * The contents of this file are subject to the terms of the
5*fa9e4066Sahrens  * Common Development and Distribution License, Version 1.0 only
6*fa9e4066Sahrens  * (the "License").  You may not use this file except in compliance
7*fa9e4066Sahrens  * with the License.
8*fa9e4066Sahrens  *
9*fa9e4066Sahrens  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10*fa9e4066Sahrens  * or http://www.opensolaris.org/os/licensing.
11*fa9e4066Sahrens  * See the License for the specific language governing permissions
12*fa9e4066Sahrens  * and limitations under the License.
13*fa9e4066Sahrens  *
14*fa9e4066Sahrens  * When distributing Covered Code, include this CDDL HEADER in each
15*fa9e4066Sahrens  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16*fa9e4066Sahrens  * If applicable, add the following below this CDDL HEADER, with the
17*fa9e4066Sahrens  * fields enclosed by brackets "[]" replaced with your own identifying
18*fa9e4066Sahrens  * information: Portions Copyright [yyyy] [name of copyright owner]
19*fa9e4066Sahrens  *
20*fa9e4066Sahrens  * CDDL HEADER END
21*fa9e4066Sahrens  */
22*fa9e4066Sahrens /*
23*fa9e4066Sahrens  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24*fa9e4066Sahrens  * Use is subject to license terms.
25*fa9e4066Sahrens  */
26*fa9e4066Sahrens 
27*fa9e4066Sahrens #pragma ident	"%Z%%M%	%I%	%E% SMI"
28*fa9e4066Sahrens 
29*fa9e4066Sahrens #include <sys/zfs_context.h>
30*fa9e4066Sahrens #include <sys/txg_impl.h>
31*fa9e4066Sahrens #include <sys/dmu_impl.h>
32*fa9e4066Sahrens #include <sys/dsl_pool.h>
33*fa9e4066Sahrens #include <sys/callb.h>
34*fa9e4066Sahrens 
35*fa9e4066Sahrens /*
36*fa9e4066Sahrens  * Pool-wide transaction groups.
37*fa9e4066Sahrens  */
38*fa9e4066Sahrens 
39*fa9e4066Sahrens static void txg_sync_thread(dsl_pool_t *dp);
40*fa9e4066Sahrens static void txg_quiesce_thread(dsl_pool_t *dp);
41*fa9e4066Sahrens static void txg_timelimit_thread(dsl_pool_t *dp);
42*fa9e4066Sahrens 
43*fa9e4066Sahrens int txg_time = 5;	/* max 5 seconds worth of delta per txg */
44*fa9e4066Sahrens 
45*fa9e4066Sahrens /*
46*fa9e4066Sahrens  * Prepare the txg subsystem.
47*fa9e4066Sahrens  */
48*fa9e4066Sahrens void
49*fa9e4066Sahrens txg_init(dsl_pool_t *dp, uint64_t txg)
50*fa9e4066Sahrens {
51*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
52*fa9e4066Sahrens 
53*fa9e4066Sahrens 	bzero(tx, sizeof (tx_state_t));
54*fa9e4066Sahrens 
55*fa9e4066Sahrens 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
56*fa9e4066Sahrens 
57*fa9e4066Sahrens 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
58*fa9e4066Sahrens 
59*fa9e4066Sahrens 	tx->tx_open_txg = txg;
60*fa9e4066Sahrens }
61*fa9e4066Sahrens 
62*fa9e4066Sahrens /*
63*fa9e4066Sahrens  * Close down the txg subsystem.
64*fa9e4066Sahrens  */
65*fa9e4066Sahrens void
66*fa9e4066Sahrens txg_fini(dsl_pool_t *dp)
67*fa9e4066Sahrens {
68*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
69*fa9e4066Sahrens 
70*fa9e4066Sahrens 	ASSERT(tx->tx_threads == 0);
71*fa9e4066Sahrens 
72*fa9e4066Sahrens 	rw_destroy(&tx->tx_suspend);
73*fa9e4066Sahrens 
74*fa9e4066Sahrens 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
75*fa9e4066Sahrens 
76*fa9e4066Sahrens 	bzero(tx, sizeof (tx_state_t));
77*fa9e4066Sahrens }
78*fa9e4066Sahrens 
79*fa9e4066Sahrens /*
80*fa9e4066Sahrens  * Start syncing transaction groups.
81*fa9e4066Sahrens  */
82*fa9e4066Sahrens void
83*fa9e4066Sahrens txg_sync_start(dsl_pool_t *dp)
84*fa9e4066Sahrens {
85*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
86*fa9e4066Sahrens 
87*fa9e4066Sahrens 	mutex_enter(&tx->tx_sync_lock);
88*fa9e4066Sahrens 
89*fa9e4066Sahrens 	dprintf("pool %p\n", dp);
90*fa9e4066Sahrens 
91*fa9e4066Sahrens 	ASSERT(tx->tx_threads == 0);
92*fa9e4066Sahrens 
93*fa9e4066Sahrens 	tx->tx_threads = 3;
94*fa9e4066Sahrens 
95*fa9e4066Sahrens 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
96*fa9e4066Sahrens 	    dp, 0, &p0, TS_RUN, minclsyspri);
97*fa9e4066Sahrens 
98*fa9e4066Sahrens 	tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread,
99*fa9e4066Sahrens 	    dp, 0, &p0, TS_RUN, minclsyspri);
100*fa9e4066Sahrens 
101*fa9e4066Sahrens 	tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread,
102*fa9e4066Sahrens 	    dp, 0, &p0, TS_RUN, minclsyspri);
103*fa9e4066Sahrens 
104*fa9e4066Sahrens 	mutex_exit(&tx->tx_sync_lock);
105*fa9e4066Sahrens }
106*fa9e4066Sahrens 
107*fa9e4066Sahrens static void
108*fa9e4066Sahrens txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
109*fa9e4066Sahrens {
110*fa9e4066Sahrens 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
111*fa9e4066Sahrens 	mutex_enter(&tx->tx_sync_lock);
112*fa9e4066Sahrens }
113*fa9e4066Sahrens 
114*fa9e4066Sahrens static void
115*fa9e4066Sahrens txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
116*fa9e4066Sahrens {
117*fa9e4066Sahrens 	ASSERT(*tpp != NULL);
118*fa9e4066Sahrens 	*tpp = NULL;
119*fa9e4066Sahrens 	tx->tx_threads--;
120*fa9e4066Sahrens 	cv_broadcast(&tx->tx_exit_cv);
121*fa9e4066Sahrens 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
122*fa9e4066Sahrens 	thread_exit();
123*fa9e4066Sahrens }
124*fa9e4066Sahrens 
125*fa9e4066Sahrens static void
126*fa9e4066Sahrens txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax)
127*fa9e4066Sahrens {
128*fa9e4066Sahrens 	CALLB_CPR_SAFE_BEGIN(cpr);
129*fa9e4066Sahrens 
130*fa9e4066Sahrens 	if (secmax)
131*fa9e4066Sahrens 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + secmax * hz);
132*fa9e4066Sahrens 	else
133*fa9e4066Sahrens 		cv_wait(cv, &tx->tx_sync_lock);
134*fa9e4066Sahrens 
135*fa9e4066Sahrens 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
136*fa9e4066Sahrens }
137*fa9e4066Sahrens 
138*fa9e4066Sahrens /*
139*fa9e4066Sahrens  * Stop syncing transaction groups.
140*fa9e4066Sahrens  */
141*fa9e4066Sahrens void
142*fa9e4066Sahrens txg_sync_stop(dsl_pool_t *dp)
143*fa9e4066Sahrens {
144*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
145*fa9e4066Sahrens 
146*fa9e4066Sahrens 	dprintf("pool %p\n", dp);
147*fa9e4066Sahrens 	/*
148*fa9e4066Sahrens 	 * Finish off any work in progress.
149*fa9e4066Sahrens 	 */
150*fa9e4066Sahrens 	ASSERT(tx->tx_threads == 3);
151*fa9e4066Sahrens 	txg_wait_synced(dp, 0);
152*fa9e4066Sahrens 
153*fa9e4066Sahrens 	/*
154*fa9e4066Sahrens 	 * Wake all 3 sync threads (one per state) and wait for them to die.
155*fa9e4066Sahrens 	 */
156*fa9e4066Sahrens 	mutex_enter(&tx->tx_sync_lock);
157*fa9e4066Sahrens 
158*fa9e4066Sahrens 	ASSERT(tx->tx_threads == 3);
159*fa9e4066Sahrens 
160*fa9e4066Sahrens 	tx->tx_exiting = 1;
161*fa9e4066Sahrens 
162*fa9e4066Sahrens 	cv_broadcast(&tx->tx_quiesce_more_cv);
163*fa9e4066Sahrens 	cv_broadcast(&tx->tx_quiesce_done_cv);
164*fa9e4066Sahrens 	cv_broadcast(&tx->tx_sync_more_cv);
165*fa9e4066Sahrens 	cv_broadcast(&tx->tx_timeout_exit_cv);
166*fa9e4066Sahrens 
167*fa9e4066Sahrens 	while (tx->tx_threads != 0)
168*fa9e4066Sahrens 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
169*fa9e4066Sahrens 
170*fa9e4066Sahrens 	tx->tx_exiting = 0;
171*fa9e4066Sahrens 
172*fa9e4066Sahrens 	mutex_exit(&tx->tx_sync_lock);
173*fa9e4066Sahrens }
174*fa9e4066Sahrens 
175*fa9e4066Sahrens uint64_t
176*fa9e4066Sahrens txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
177*fa9e4066Sahrens {
178*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
179*fa9e4066Sahrens 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
180*fa9e4066Sahrens 	uint64_t txg;
181*fa9e4066Sahrens 
182*fa9e4066Sahrens 	mutex_enter(&tc->tc_lock);
183*fa9e4066Sahrens 
184*fa9e4066Sahrens 	txg = tx->tx_open_txg;
185*fa9e4066Sahrens 	tc->tc_count[txg & TXG_MASK]++;
186*fa9e4066Sahrens 
187*fa9e4066Sahrens 	th->th_cpu = tc;
188*fa9e4066Sahrens 	th->th_txg = txg;
189*fa9e4066Sahrens 
190*fa9e4066Sahrens 	return (txg);
191*fa9e4066Sahrens }
192*fa9e4066Sahrens 
193*fa9e4066Sahrens void
194*fa9e4066Sahrens txg_rele_to_quiesce(txg_handle_t *th)
195*fa9e4066Sahrens {
196*fa9e4066Sahrens 	tx_cpu_t *tc = th->th_cpu;
197*fa9e4066Sahrens 
198*fa9e4066Sahrens 	mutex_exit(&tc->tc_lock);
199*fa9e4066Sahrens }
200*fa9e4066Sahrens 
201*fa9e4066Sahrens void
202*fa9e4066Sahrens txg_rele_to_sync(txg_handle_t *th)
203*fa9e4066Sahrens {
204*fa9e4066Sahrens 	tx_cpu_t *tc = th->th_cpu;
205*fa9e4066Sahrens 	int g = th->th_txg & TXG_MASK;
206*fa9e4066Sahrens 
207*fa9e4066Sahrens 	mutex_enter(&tc->tc_lock);
208*fa9e4066Sahrens 	ASSERT(tc->tc_count[g] != 0);
209*fa9e4066Sahrens 	if (--tc->tc_count[g] == 0)
210*fa9e4066Sahrens 		cv_broadcast(&tc->tc_cv[g]);
211*fa9e4066Sahrens 	mutex_exit(&tc->tc_lock);
212*fa9e4066Sahrens 
213*fa9e4066Sahrens 	th->th_cpu = NULL;	/* defensive */
214*fa9e4066Sahrens }
215*fa9e4066Sahrens 
216*fa9e4066Sahrens static void
217*fa9e4066Sahrens txg_quiesce(dsl_pool_t *dp, uint64_t txg)
218*fa9e4066Sahrens {
219*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
220*fa9e4066Sahrens 	int g = txg & TXG_MASK;
221*fa9e4066Sahrens 	int c;
222*fa9e4066Sahrens 
223*fa9e4066Sahrens 	/*
224*fa9e4066Sahrens 	 * Grab all tx_cpu locks so nobody else can get into this txg.
225*fa9e4066Sahrens 	 */
226*fa9e4066Sahrens 	for (c = 0; c < max_ncpus; c++)
227*fa9e4066Sahrens 		mutex_enter(&tx->tx_cpu[c].tc_lock);
228*fa9e4066Sahrens 
229*fa9e4066Sahrens 	ASSERT(txg == tx->tx_open_txg);
230*fa9e4066Sahrens 	tx->tx_open_txg++;
231*fa9e4066Sahrens 
232*fa9e4066Sahrens 	/*
233*fa9e4066Sahrens 	 * Now that we've incremented tx_open_txg, we can let threads
234*fa9e4066Sahrens 	 * enter the next transaction group.
235*fa9e4066Sahrens 	 */
236*fa9e4066Sahrens 	for (c = 0; c < max_ncpus; c++)
237*fa9e4066Sahrens 		mutex_exit(&tx->tx_cpu[c].tc_lock);
238*fa9e4066Sahrens 
239*fa9e4066Sahrens 	/*
240*fa9e4066Sahrens 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
241*fa9e4066Sahrens 	 */
242*fa9e4066Sahrens 	for (c = 0; c < max_ncpus; c++) {
243*fa9e4066Sahrens 		tx_cpu_t *tc = &tx->tx_cpu[c];
244*fa9e4066Sahrens 		mutex_enter(&tc->tc_lock);
245*fa9e4066Sahrens 		while (tc->tc_count[g] != 0)
246*fa9e4066Sahrens 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
247*fa9e4066Sahrens 		mutex_exit(&tc->tc_lock);
248*fa9e4066Sahrens 	}
249*fa9e4066Sahrens }
250*fa9e4066Sahrens 
251*fa9e4066Sahrens static void
252*fa9e4066Sahrens txg_sync_thread(dsl_pool_t *dp)
253*fa9e4066Sahrens {
254*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
255*fa9e4066Sahrens 	callb_cpr_t cpr;
256*fa9e4066Sahrens 
257*fa9e4066Sahrens 	txg_thread_enter(tx, &cpr);
258*fa9e4066Sahrens 
259*fa9e4066Sahrens 	for (;;) {
260*fa9e4066Sahrens 		uint64_t txg;
261*fa9e4066Sahrens 
262*fa9e4066Sahrens 		/*
263*fa9e4066Sahrens 		 * We sync when there's someone waiting on us, or the
264*fa9e4066Sahrens 		 * quiesce thread has handed off a txg to us.
265*fa9e4066Sahrens 		 */
266*fa9e4066Sahrens 		while (!tx->tx_exiting &&
267*fa9e4066Sahrens 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
268*fa9e4066Sahrens 		    tx->tx_quiesced_txg == 0) {
269*fa9e4066Sahrens 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
270*fa9e4066Sahrens 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
271*fa9e4066Sahrens 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0);
272*fa9e4066Sahrens 		}
273*fa9e4066Sahrens 
274*fa9e4066Sahrens 		/*
275*fa9e4066Sahrens 		 * Wait until the quiesce thread hands off a txg to us,
276*fa9e4066Sahrens 		 * prompting it to do so if necessary.
277*fa9e4066Sahrens 		 */
278*fa9e4066Sahrens 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
279*fa9e4066Sahrens 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
280*fa9e4066Sahrens 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
281*fa9e4066Sahrens 			cv_broadcast(&tx->tx_quiesce_more_cv);
282*fa9e4066Sahrens 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
283*fa9e4066Sahrens 		}
284*fa9e4066Sahrens 
285*fa9e4066Sahrens 		if (tx->tx_exiting)
286*fa9e4066Sahrens 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
287*fa9e4066Sahrens 
288*fa9e4066Sahrens 		rw_enter(&tx->tx_suspend, RW_WRITER);
289*fa9e4066Sahrens 
290*fa9e4066Sahrens 		/*
291*fa9e4066Sahrens 		 * Consume the quiesced txg which has been handed off to
292*fa9e4066Sahrens 		 * us.  This may cause the quiescing thread to now be
293*fa9e4066Sahrens 		 * able to quiesce another txg, so we must signal it.
294*fa9e4066Sahrens 		 */
295*fa9e4066Sahrens 		txg = tx->tx_quiesced_txg;
296*fa9e4066Sahrens 		tx->tx_quiesced_txg = 0;
297*fa9e4066Sahrens 		tx->tx_syncing_txg = txg;
298*fa9e4066Sahrens 		cv_broadcast(&tx->tx_quiesce_more_cv);
299*fa9e4066Sahrens 		rw_exit(&tx->tx_suspend);
300*fa9e4066Sahrens 
301*fa9e4066Sahrens 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
302*fa9e4066Sahrens 			txg, tx->tx_quiesce_txg_waiting,
303*fa9e4066Sahrens 			tx->tx_sync_txg_waiting);
304*fa9e4066Sahrens 		mutex_exit(&tx->tx_sync_lock);
305*fa9e4066Sahrens 		spa_sync(dp->dp_spa, txg);
306*fa9e4066Sahrens 		mutex_enter(&tx->tx_sync_lock);
307*fa9e4066Sahrens 		rw_enter(&tx->tx_suspend, RW_WRITER);
308*fa9e4066Sahrens 		tx->tx_synced_txg = txg;
309*fa9e4066Sahrens 		tx->tx_syncing_txg = 0;
310*fa9e4066Sahrens 		rw_exit(&tx->tx_suspend);
311*fa9e4066Sahrens 		cv_broadcast(&tx->tx_sync_done_cv);
312*fa9e4066Sahrens 	}
313*fa9e4066Sahrens }
314*fa9e4066Sahrens 
315*fa9e4066Sahrens static void
316*fa9e4066Sahrens txg_quiesce_thread(dsl_pool_t *dp)
317*fa9e4066Sahrens {
318*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
319*fa9e4066Sahrens 	callb_cpr_t cpr;
320*fa9e4066Sahrens 
321*fa9e4066Sahrens 	txg_thread_enter(tx, &cpr);
322*fa9e4066Sahrens 
323*fa9e4066Sahrens 	for (;;) {
324*fa9e4066Sahrens 		uint64_t txg;
325*fa9e4066Sahrens 
326*fa9e4066Sahrens 		/*
327*fa9e4066Sahrens 		 * We quiesce when there's someone waiting on us.
328*fa9e4066Sahrens 		 * However, we can only have one txg in "quiescing" or
329*fa9e4066Sahrens 		 * "quiesced, waiting to sync" state.  So we wait until
330*fa9e4066Sahrens 		 * the "quiesced, waiting to sync" txg has been consumed
331*fa9e4066Sahrens 		 * by the sync thread.
332*fa9e4066Sahrens 		 */
333*fa9e4066Sahrens 		while (!tx->tx_exiting &&
334*fa9e4066Sahrens 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
335*fa9e4066Sahrens 		    tx->tx_quiesced_txg != 0))
336*fa9e4066Sahrens 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
337*fa9e4066Sahrens 
338*fa9e4066Sahrens 		if (tx->tx_exiting)
339*fa9e4066Sahrens 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
340*fa9e4066Sahrens 
341*fa9e4066Sahrens 		txg = tx->tx_open_txg;
342*fa9e4066Sahrens 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
343*fa9e4066Sahrens 		    txg, tx->tx_quiesce_txg_waiting,
344*fa9e4066Sahrens 		    tx->tx_sync_txg_waiting);
345*fa9e4066Sahrens 		mutex_exit(&tx->tx_sync_lock);
346*fa9e4066Sahrens 		txg_quiesce(dp, txg);
347*fa9e4066Sahrens 		mutex_enter(&tx->tx_sync_lock);
348*fa9e4066Sahrens 
349*fa9e4066Sahrens 		/*
350*fa9e4066Sahrens 		 * Hand this txg off to the sync thread.
351*fa9e4066Sahrens 		 */
352*fa9e4066Sahrens 		dprintf("quiesce done, handing off txg %llu\n", txg);
353*fa9e4066Sahrens 		tx->tx_quiesced_txg = txg;
354*fa9e4066Sahrens 		cv_broadcast(&tx->tx_sync_more_cv);
355*fa9e4066Sahrens 		cv_broadcast(&tx->tx_quiesce_done_cv);
356*fa9e4066Sahrens 	}
357*fa9e4066Sahrens }
358*fa9e4066Sahrens 
359*fa9e4066Sahrens void
360*fa9e4066Sahrens txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
361*fa9e4066Sahrens {
362*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
363*fa9e4066Sahrens 
364*fa9e4066Sahrens 	mutex_enter(&tx->tx_sync_lock);
365*fa9e4066Sahrens 	ASSERT(tx->tx_threads == 3);
366*fa9e4066Sahrens 	if (txg == 0)
367*fa9e4066Sahrens 		txg = tx->tx_open_txg;
368*fa9e4066Sahrens 	if (tx->tx_sync_txg_waiting < txg)
369*fa9e4066Sahrens 		tx->tx_sync_txg_waiting = txg;
370*fa9e4066Sahrens 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
371*fa9e4066Sahrens 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
372*fa9e4066Sahrens 	while (tx->tx_synced_txg < txg) {
373*fa9e4066Sahrens 		dprintf("broadcasting sync more "
374*fa9e4066Sahrens 		    "tx_synced=%llu waiting=%llu dp=%p\n",
375*fa9e4066Sahrens 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
376*fa9e4066Sahrens 		cv_broadcast(&tx->tx_sync_more_cv);
377*fa9e4066Sahrens 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
378*fa9e4066Sahrens 	}
379*fa9e4066Sahrens 	mutex_exit(&tx->tx_sync_lock);
380*fa9e4066Sahrens }
381*fa9e4066Sahrens 
382*fa9e4066Sahrens void
383*fa9e4066Sahrens txg_wait_open(dsl_pool_t *dp, uint64_t txg)
384*fa9e4066Sahrens {
385*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
386*fa9e4066Sahrens 
387*fa9e4066Sahrens 	mutex_enter(&tx->tx_sync_lock);
388*fa9e4066Sahrens 	ASSERT(tx->tx_threads == 3);
389*fa9e4066Sahrens 	if (txg == 0)
390*fa9e4066Sahrens 		txg = tx->tx_open_txg + 1;
391*fa9e4066Sahrens 	if (tx->tx_quiesce_txg_waiting < txg)
392*fa9e4066Sahrens 		tx->tx_quiesce_txg_waiting = txg;
393*fa9e4066Sahrens 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
394*fa9e4066Sahrens 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
395*fa9e4066Sahrens 	while (tx->tx_open_txg < txg) {
396*fa9e4066Sahrens 		cv_broadcast(&tx->tx_quiesce_more_cv);
397*fa9e4066Sahrens 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
398*fa9e4066Sahrens 	}
399*fa9e4066Sahrens 	mutex_exit(&tx->tx_sync_lock);
400*fa9e4066Sahrens }
401*fa9e4066Sahrens 
402*fa9e4066Sahrens static void
403*fa9e4066Sahrens txg_timelimit_thread(dsl_pool_t *dp)
404*fa9e4066Sahrens {
405*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
406*fa9e4066Sahrens 	callb_cpr_t cpr;
407*fa9e4066Sahrens 
408*fa9e4066Sahrens 	txg_thread_enter(tx, &cpr);
409*fa9e4066Sahrens 
410*fa9e4066Sahrens 	while (!tx->tx_exiting) {
411*fa9e4066Sahrens 		uint64_t txg = tx->tx_open_txg + 1;
412*fa9e4066Sahrens 
413*fa9e4066Sahrens 		txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time);
414*fa9e4066Sahrens 
415*fa9e4066Sahrens 		if (tx->tx_quiesce_txg_waiting < txg)
416*fa9e4066Sahrens 			tx->tx_quiesce_txg_waiting = txg;
417*fa9e4066Sahrens 
418*fa9e4066Sahrens 		while (!tx->tx_exiting && tx->tx_open_txg < txg) {
419*fa9e4066Sahrens 			dprintf("pushing out %llu\n", txg);
420*fa9e4066Sahrens 			cv_broadcast(&tx->tx_quiesce_more_cv);
421*fa9e4066Sahrens 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
422*fa9e4066Sahrens 		}
423*fa9e4066Sahrens 	}
424*fa9e4066Sahrens 	txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread);
425*fa9e4066Sahrens }
426*fa9e4066Sahrens 
427*fa9e4066Sahrens int
428*fa9e4066Sahrens txg_stalled(dsl_pool_t *dp)
429*fa9e4066Sahrens {
430*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
431*fa9e4066Sahrens 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
432*fa9e4066Sahrens }
433*fa9e4066Sahrens 
434*fa9e4066Sahrens void
435*fa9e4066Sahrens txg_suspend(dsl_pool_t *dp)
436*fa9e4066Sahrens {
437*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
438*fa9e4066Sahrens 	/* XXX some code paths suspend when they are already suspended! */
439*fa9e4066Sahrens 	rw_enter(&tx->tx_suspend, RW_READER);
440*fa9e4066Sahrens }
441*fa9e4066Sahrens 
442*fa9e4066Sahrens void
443*fa9e4066Sahrens txg_resume(dsl_pool_t *dp)
444*fa9e4066Sahrens {
445*fa9e4066Sahrens 	tx_state_t *tx = &dp->dp_tx;
446*fa9e4066Sahrens 	rw_exit(&tx->tx_suspend);
447*fa9e4066Sahrens }
448*fa9e4066Sahrens 
449*fa9e4066Sahrens /*
450*fa9e4066Sahrens  * Per-txg object lists.
451*fa9e4066Sahrens  */
452*fa9e4066Sahrens void
453*fa9e4066Sahrens txg_list_create(txg_list_t *tl, size_t offset)
454*fa9e4066Sahrens {
455*fa9e4066Sahrens 	int t;
456*fa9e4066Sahrens 
457*fa9e4066Sahrens 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
458*fa9e4066Sahrens 
459*fa9e4066Sahrens 	tl->tl_offset = offset;
460*fa9e4066Sahrens 
461*fa9e4066Sahrens 	for (t = 0; t < TXG_SIZE; t++)
462*fa9e4066Sahrens 		tl->tl_head[t] = NULL;
463*fa9e4066Sahrens }
464*fa9e4066Sahrens 
465*fa9e4066Sahrens void
466*fa9e4066Sahrens txg_list_destroy(txg_list_t *tl)
467*fa9e4066Sahrens {
468*fa9e4066Sahrens 	int t;
469*fa9e4066Sahrens 
470*fa9e4066Sahrens 	for (t = 0; t < TXG_SIZE; t++)
471*fa9e4066Sahrens 		ASSERT(txg_list_empty(tl, t));
472*fa9e4066Sahrens 
473*fa9e4066Sahrens 	mutex_destroy(&tl->tl_lock);
474*fa9e4066Sahrens }
475*fa9e4066Sahrens 
476*fa9e4066Sahrens int
477*fa9e4066Sahrens txg_list_empty(txg_list_t *tl, uint64_t txg)
478*fa9e4066Sahrens {
479*fa9e4066Sahrens 	return (tl->tl_head[txg & TXG_MASK] == NULL);
480*fa9e4066Sahrens }
481*fa9e4066Sahrens 
482*fa9e4066Sahrens /*
483*fa9e4066Sahrens  * Add an entry to the list.
484*fa9e4066Sahrens  * Returns 0 if it's a new entry, 1 if it's already there.
485*fa9e4066Sahrens  */
486*fa9e4066Sahrens int
487*fa9e4066Sahrens txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
488*fa9e4066Sahrens {
489*fa9e4066Sahrens 	int t = txg & TXG_MASK;
490*fa9e4066Sahrens 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
491*fa9e4066Sahrens 	int already_on_list;
492*fa9e4066Sahrens 
493*fa9e4066Sahrens 	mutex_enter(&tl->tl_lock);
494*fa9e4066Sahrens 	already_on_list = tn->tn_member[t];
495*fa9e4066Sahrens 	if (!already_on_list) {
496*fa9e4066Sahrens 		tn->tn_member[t] = 1;
497*fa9e4066Sahrens 		tn->tn_next[t] = tl->tl_head[t];
498*fa9e4066Sahrens 		tl->tl_head[t] = tn;
499*fa9e4066Sahrens 	}
500*fa9e4066Sahrens 	mutex_exit(&tl->tl_lock);
501*fa9e4066Sahrens 
502*fa9e4066Sahrens 	return (already_on_list);
503*fa9e4066Sahrens }
504*fa9e4066Sahrens 
505*fa9e4066Sahrens /*
506*fa9e4066Sahrens  * Remove the head of the list and return it.
507*fa9e4066Sahrens  */
508*fa9e4066Sahrens void *
509*fa9e4066Sahrens txg_list_remove(txg_list_t *tl, uint64_t txg)
510*fa9e4066Sahrens {
511*fa9e4066Sahrens 	int t = txg & TXG_MASK;
512*fa9e4066Sahrens 	txg_node_t *tn;
513*fa9e4066Sahrens 	void *p = NULL;
514*fa9e4066Sahrens 
515*fa9e4066Sahrens 	mutex_enter(&tl->tl_lock);
516*fa9e4066Sahrens 	if ((tn = tl->tl_head[t]) != NULL) {
517*fa9e4066Sahrens 		p = (char *)tn - tl->tl_offset;
518*fa9e4066Sahrens 		tl->tl_head[t] = tn->tn_next[t];
519*fa9e4066Sahrens 		tn->tn_next[t] = NULL;
520*fa9e4066Sahrens 		tn->tn_member[t] = 0;
521*fa9e4066Sahrens 	}
522*fa9e4066Sahrens 	mutex_exit(&tl->tl_lock);
523*fa9e4066Sahrens 
524*fa9e4066Sahrens 	return (p);
525*fa9e4066Sahrens }
526*fa9e4066Sahrens 
527*fa9e4066Sahrens /*
528*fa9e4066Sahrens  * Remove a specific item from the list and return it.
529*fa9e4066Sahrens  */
530*fa9e4066Sahrens void *
531*fa9e4066Sahrens txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
532*fa9e4066Sahrens {
533*fa9e4066Sahrens 	int t = txg & TXG_MASK;
534*fa9e4066Sahrens 	txg_node_t *tn, **tp;
535*fa9e4066Sahrens 
536*fa9e4066Sahrens 	mutex_enter(&tl->tl_lock);
537*fa9e4066Sahrens 
538*fa9e4066Sahrens 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
539*fa9e4066Sahrens 		if ((char *)tn - tl->tl_offset == p) {
540*fa9e4066Sahrens 			*tp = tn->tn_next[t];
541*fa9e4066Sahrens 			tn->tn_next[t] = NULL;
542*fa9e4066Sahrens 			tn->tn_member[t] = 0;
543*fa9e4066Sahrens 			mutex_exit(&tl->tl_lock);
544*fa9e4066Sahrens 			return (p);
545*fa9e4066Sahrens 		}
546*fa9e4066Sahrens 	}
547*fa9e4066Sahrens 
548*fa9e4066Sahrens 	mutex_exit(&tl->tl_lock);
549*fa9e4066Sahrens 
550*fa9e4066Sahrens 	return (NULL);
551*fa9e4066Sahrens }
552*fa9e4066Sahrens 
553*fa9e4066Sahrens int
554*fa9e4066Sahrens txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
555*fa9e4066Sahrens {
556*fa9e4066Sahrens 	int t = txg & TXG_MASK;
557*fa9e4066Sahrens 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
558*fa9e4066Sahrens 
559*fa9e4066Sahrens 	return (tn->tn_member[t]);
560*fa9e4066Sahrens }
561*fa9e4066Sahrens 
562*fa9e4066Sahrens /*
563*fa9e4066Sahrens  * Walk a txg list -- only safe if you know it's not changing.
564*fa9e4066Sahrens  */
565*fa9e4066Sahrens void *
566*fa9e4066Sahrens txg_list_head(txg_list_t *tl, uint64_t txg)
567*fa9e4066Sahrens {
568*fa9e4066Sahrens 	int t = txg & TXG_MASK;
569*fa9e4066Sahrens 	txg_node_t *tn = tl->tl_head[t];
570*fa9e4066Sahrens 
571*fa9e4066Sahrens 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
572*fa9e4066Sahrens }
573*fa9e4066Sahrens 
574*fa9e4066Sahrens void *
575*fa9e4066Sahrens txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
576*fa9e4066Sahrens {
577*fa9e4066Sahrens 	int t = txg & TXG_MASK;
578*fa9e4066Sahrens 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
579*fa9e4066Sahrens 
580*fa9e4066Sahrens 	tn = tn->tn_next[t];
581*fa9e4066Sahrens 
582*fa9e4066Sahrens 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
583*fa9e4066Sahrens }
584