xref: /illumos-gate/usr/src/uts/common/fs/zfs/txg.c (revision 5ad820458efd0fdb914baff9c1447c22b819fa23)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/zfs_context.h>
29 #include <sys/txg_impl.h>
30 #include <sys/dmu_impl.h>
31 #include <sys/dsl_pool.h>
32 #include <sys/callb.h>
33 
34 /*
35  * Pool-wide transaction groups.
36  */
37 
38 static void txg_sync_thread(dsl_pool_t *dp);
39 static void txg_quiesce_thread(dsl_pool_t *dp);
40 static void txg_timelimit_thread(dsl_pool_t *dp);
41 
42 int txg_time = 5;	/* max 5 seconds worth of delta per txg */
43 
44 /*
45  * Prepare the txg subsystem.
46  */
47 void
48 txg_init(dsl_pool_t *dp, uint64_t txg)
49 {
50 	tx_state_t *tx = &dp->dp_tx;
51 	int c;
52 	bzero(tx, sizeof (tx_state_t));
53 
54 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
55 
56 	for (c = 0; c < max_ncpus; c++)
57 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
58 
59 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
60 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
61 
62 	tx->tx_open_txg = txg;
63 }
64 
65 /*
66  * Close down the txg subsystem.
67  */
68 void
69 txg_fini(dsl_pool_t *dp)
70 {
71 	tx_state_t *tx = &dp->dp_tx;
72 	int c;
73 
74 	ASSERT(tx->tx_threads == 0);
75 
76 	rw_destroy(&tx->tx_suspend);
77 	mutex_destroy(&tx->tx_sync_lock);
78 
79 	for (c = 0; c < max_ncpus; c++)
80 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
81 
82 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
83 
84 	bzero(tx, sizeof (tx_state_t));
85 }
86 
87 /*
88  * Start syncing transaction groups.
89  */
90 void
91 txg_sync_start(dsl_pool_t *dp)
92 {
93 	tx_state_t *tx = &dp->dp_tx;
94 
95 	mutex_enter(&tx->tx_sync_lock);
96 
97 	dprintf("pool %p\n", dp);
98 
99 	ASSERT(tx->tx_threads == 0);
100 
101 	tx->tx_threads = 3;
102 
103 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
104 	    dp, 0, &p0, TS_RUN, minclsyspri);
105 
106 	tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread,
107 	    dp, 0, &p0, TS_RUN, minclsyspri);
108 
109 	tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread,
110 	    dp, 0, &p0, TS_RUN, minclsyspri);
111 
112 	mutex_exit(&tx->tx_sync_lock);
113 }
114 
115 static void
116 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
117 {
118 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
119 	mutex_enter(&tx->tx_sync_lock);
120 }
121 
122 static void
123 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
124 {
125 	ASSERT(*tpp != NULL);
126 	*tpp = NULL;
127 	tx->tx_threads--;
128 	cv_broadcast(&tx->tx_exit_cv);
129 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
130 	thread_exit();
131 }
132 
133 static void
134 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax)
135 {
136 	CALLB_CPR_SAFE_BEGIN(cpr);
137 
138 	if (secmax)
139 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + secmax * hz);
140 	else
141 		cv_wait(cv, &tx->tx_sync_lock);
142 
143 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
144 }
145 
146 /*
147  * Stop syncing transaction groups.
148  */
149 void
150 txg_sync_stop(dsl_pool_t *dp)
151 {
152 	tx_state_t *tx = &dp->dp_tx;
153 
154 	dprintf("pool %p\n", dp);
155 	/*
156 	 * Finish off any work in progress.
157 	 */
158 	ASSERT(tx->tx_threads == 3);
159 	txg_wait_synced(dp, 0);
160 
161 	/*
162 	 * Wake all 3 sync threads (one per state) and wait for them to die.
163 	 */
164 	mutex_enter(&tx->tx_sync_lock);
165 
166 	ASSERT(tx->tx_threads == 3);
167 
168 	tx->tx_exiting = 1;
169 
170 	cv_broadcast(&tx->tx_quiesce_more_cv);
171 	cv_broadcast(&tx->tx_quiesce_done_cv);
172 	cv_broadcast(&tx->tx_sync_more_cv);
173 	cv_broadcast(&tx->tx_timeout_exit_cv);
174 
175 	while (tx->tx_threads != 0)
176 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
177 
178 	tx->tx_exiting = 0;
179 
180 	mutex_exit(&tx->tx_sync_lock);
181 }
182 
183 uint64_t
184 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
185 {
186 	tx_state_t *tx = &dp->dp_tx;
187 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
188 	uint64_t txg;
189 
190 	mutex_enter(&tc->tc_lock);
191 
192 	txg = tx->tx_open_txg;
193 	tc->tc_count[txg & TXG_MASK]++;
194 
195 	th->th_cpu = tc;
196 	th->th_txg = txg;
197 
198 	return (txg);
199 }
200 
201 void
202 txg_rele_to_quiesce(txg_handle_t *th)
203 {
204 	tx_cpu_t *tc = th->th_cpu;
205 
206 	mutex_exit(&tc->tc_lock);
207 }
208 
209 void
210 txg_rele_to_sync(txg_handle_t *th)
211 {
212 	tx_cpu_t *tc = th->th_cpu;
213 	int g = th->th_txg & TXG_MASK;
214 
215 	mutex_enter(&tc->tc_lock);
216 	ASSERT(tc->tc_count[g] != 0);
217 	if (--tc->tc_count[g] == 0)
218 		cv_broadcast(&tc->tc_cv[g]);
219 	mutex_exit(&tc->tc_lock);
220 
221 	th->th_cpu = NULL;	/* defensive */
222 }
223 
224 static void
225 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
226 {
227 	tx_state_t *tx = &dp->dp_tx;
228 	int g = txg & TXG_MASK;
229 	int c;
230 
231 	/*
232 	 * Grab all tx_cpu locks so nobody else can get into this txg.
233 	 */
234 	for (c = 0; c < max_ncpus; c++)
235 		mutex_enter(&tx->tx_cpu[c].tc_lock);
236 
237 	ASSERT(txg == tx->tx_open_txg);
238 	tx->tx_open_txg++;
239 
240 	/*
241 	 * Now that we've incremented tx_open_txg, we can let threads
242 	 * enter the next transaction group.
243 	 */
244 	for (c = 0; c < max_ncpus; c++)
245 		mutex_exit(&tx->tx_cpu[c].tc_lock);
246 
247 	/*
248 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
249 	 */
250 	for (c = 0; c < max_ncpus; c++) {
251 		tx_cpu_t *tc = &tx->tx_cpu[c];
252 		mutex_enter(&tc->tc_lock);
253 		while (tc->tc_count[g] != 0)
254 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
255 		mutex_exit(&tc->tc_lock);
256 	}
257 }
258 
259 static void
260 txg_sync_thread(dsl_pool_t *dp)
261 {
262 	tx_state_t *tx = &dp->dp_tx;
263 	callb_cpr_t cpr;
264 
265 	txg_thread_enter(tx, &cpr);
266 
267 	for (;;) {
268 		uint64_t txg;
269 
270 		/*
271 		 * We sync when there's someone waiting on us, or the
272 		 * quiesce thread has handed off a txg to us.
273 		 */
274 		while (!tx->tx_exiting &&
275 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
276 		    tx->tx_quiesced_txg == 0) {
277 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
278 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
279 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0);
280 		}
281 
282 		/*
283 		 * Wait until the quiesce thread hands off a txg to us,
284 		 * prompting it to do so if necessary.
285 		 */
286 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
287 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
288 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
289 			cv_broadcast(&tx->tx_quiesce_more_cv);
290 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
291 		}
292 
293 		if (tx->tx_exiting)
294 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
295 
296 		rw_enter(&tx->tx_suspend, RW_WRITER);
297 
298 		/*
299 		 * Consume the quiesced txg which has been handed off to
300 		 * us.  This may cause the quiescing thread to now be
301 		 * able to quiesce another txg, so we must signal it.
302 		 */
303 		txg = tx->tx_quiesced_txg;
304 		tx->tx_quiesced_txg = 0;
305 		tx->tx_syncing_txg = txg;
306 		cv_broadcast(&tx->tx_quiesce_more_cv);
307 		rw_exit(&tx->tx_suspend);
308 
309 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
310 			txg, tx->tx_quiesce_txg_waiting,
311 			tx->tx_sync_txg_waiting);
312 		mutex_exit(&tx->tx_sync_lock);
313 		spa_sync(dp->dp_spa, txg);
314 		mutex_enter(&tx->tx_sync_lock);
315 		rw_enter(&tx->tx_suspend, RW_WRITER);
316 		tx->tx_synced_txg = txg;
317 		tx->tx_syncing_txg = 0;
318 		rw_exit(&tx->tx_suspend);
319 		cv_broadcast(&tx->tx_sync_done_cv);
320 	}
321 }
322 
323 static void
324 txg_quiesce_thread(dsl_pool_t *dp)
325 {
326 	tx_state_t *tx = &dp->dp_tx;
327 	callb_cpr_t cpr;
328 
329 	txg_thread_enter(tx, &cpr);
330 
331 	for (;;) {
332 		uint64_t txg;
333 
334 		/*
335 		 * We quiesce when there's someone waiting on us.
336 		 * However, we can only have one txg in "quiescing" or
337 		 * "quiesced, waiting to sync" state.  So we wait until
338 		 * the "quiesced, waiting to sync" txg has been consumed
339 		 * by the sync thread.
340 		 */
341 		while (!tx->tx_exiting &&
342 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
343 		    tx->tx_quiesced_txg != 0))
344 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
345 
346 		if (tx->tx_exiting)
347 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
348 
349 		txg = tx->tx_open_txg;
350 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
351 		    txg, tx->tx_quiesce_txg_waiting,
352 		    tx->tx_sync_txg_waiting);
353 		mutex_exit(&tx->tx_sync_lock);
354 		txg_quiesce(dp, txg);
355 		mutex_enter(&tx->tx_sync_lock);
356 
357 		/*
358 		 * Hand this txg off to the sync thread.
359 		 */
360 		dprintf("quiesce done, handing off txg %llu\n", txg);
361 		tx->tx_quiesced_txg = txg;
362 		cv_broadcast(&tx->tx_sync_more_cv);
363 		cv_broadcast(&tx->tx_quiesce_done_cv);
364 	}
365 }
366 
367 void
368 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
369 {
370 	tx_state_t *tx = &dp->dp_tx;
371 
372 	mutex_enter(&tx->tx_sync_lock);
373 	ASSERT(tx->tx_threads == 3);
374 	if (txg == 0)
375 		txg = tx->tx_open_txg;
376 	if (tx->tx_sync_txg_waiting < txg)
377 		tx->tx_sync_txg_waiting = txg;
378 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
379 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
380 	while (tx->tx_synced_txg < txg) {
381 		dprintf("broadcasting sync more "
382 		    "tx_synced=%llu waiting=%llu dp=%p\n",
383 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
384 		cv_broadcast(&tx->tx_sync_more_cv);
385 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
386 	}
387 	mutex_exit(&tx->tx_sync_lock);
388 }
389 
390 void
391 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
392 {
393 	tx_state_t *tx = &dp->dp_tx;
394 
395 	mutex_enter(&tx->tx_sync_lock);
396 	ASSERT(tx->tx_threads == 3);
397 	if (txg == 0)
398 		txg = tx->tx_open_txg + 1;
399 	if (tx->tx_quiesce_txg_waiting < txg)
400 		tx->tx_quiesce_txg_waiting = txg;
401 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
402 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
403 	while (tx->tx_open_txg < txg) {
404 		cv_broadcast(&tx->tx_quiesce_more_cv);
405 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
406 	}
407 	mutex_exit(&tx->tx_sync_lock);
408 }
409 
410 static void
411 txg_timelimit_thread(dsl_pool_t *dp)
412 {
413 	tx_state_t *tx = &dp->dp_tx;
414 	callb_cpr_t cpr;
415 
416 	txg_thread_enter(tx, &cpr);
417 
418 	while (!tx->tx_exiting) {
419 		uint64_t txg = tx->tx_open_txg + 1;
420 
421 		txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time);
422 
423 		if (tx->tx_quiesce_txg_waiting < txg)
424 			tx->tx_quiesce_txg_waiting = txg;
425 
426 		while (!tx->tx_exiting && tx->tx_open_txg < txg) {
427 			dprintf("pushing out %llu\n", txg);
428 			cv_broadcast(&tx->tx_quiesce_more_cv);
429 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
430 		}
431 	}
432 	txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread);
433 }
434 
435 int
436 txg_stalled(dsl_pool_t *dp)
437 {
438 	tx_state_t *tx = &dp->dp_tx;
439 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
440 }
441 
442 void
443 txg_suspend(dsl_pool_t *dp)
444 {
445 	tx_state_t *tx = &dp->dp_tx;
446 	/* XXX some code paths suspend when they are already suspended! */
447 	rw_enter(&tx->tx_suspend, RW_READER);
448 }
449 
450 void
451 txg_resume(dsl_pool_t *dp)
452 {
453 	tx_state_t *tx = &dp->dp_tx;
454 	rw_exit(&tx->tx_suspend);
455 }
456 
457 /*
458  * Per-txg object lists.
459  */
460 void
461 txg_list_create(txg_list_t *tl, size_t offset)
462 {
463 	int t;
464 
465 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
466 
467 	tl->tl_offset = offset;
468 
469 	for (t = 0; t < TXG_SIZE; t++)
470 		tl->tl_head[t] = NULL;
471 }
472 
473 void
474 txg_list_destroy(txg_list_t *tl)
475 {
476 	int t;
477 
478 	for (t = 0; t < TXG_SIZE; t++)
479 		ASSERT(txg_list_empty(tl, t));
480 
481 	mutex_destroy(&tl->tl_lock);
482 }
483 
484 int
485 txg_list_empty(txg_list_t *tl, uint64_t txg)
486 {
487 	return (tl->tl_head[txg & TXG_MASK] == NULL);
488 }
489 
490 /*
491  * Add an entry to the list.
492  * Returns 0 if it's a new entry, 1 if it's already there.
493  */
494 int
495 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
496 {
497 	int t = txg & TXG_MASK;
498 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
499 	int already_on_list;
500 
501 	mutex_enter(&tl->tl_lock);
502 	already_on_list = tn->tn_member[t];
503 	if (!already_on_list) {
504 		tn->tn_member[t] = 1;
505 		tn->tn_next[t] = tl->tl_head[t];
506 		tl->tl_head[t] = tn;
507 	}
508 	mutex_exit(&tl->tl_lock);
509 
510 	return (already_on_list);
511 }
512 
513 /*
514  * Remove the head of the list and return it.
515  */
516 void *
517 txg_list_remove(txg_list_t *tl, uint64_t txg)
518 {
519 	int t = txg & TXG_MASK;
520 	txg_node_t *tn;
521 	void *p = NULL;
522 
523 	mutex_enter(&tl->tl_lock);
524 	if ((tn = tl->tl_head[t]) != NULL) {
525 		p = (char *)tn - tl->tl_offset;
526 		tl->tl_head[t] = tn->tn_next[t];
527 		tn->tn_next[t] = NULL;
528 		tn->tn_member[t] = 0;
529 	}
530 	mutex_exit(&tl->tl_lock);
531 
532 	return (p);
533 }
534 
535 /*
536  * Remove a specific item from the list and return it.
537  */
538 void *
539 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
540 {
541 	int t = txg & TXG_MASK;
542 	txg_node_t *tn, **tp;
543 
544 	mutex_enter(&tl->tl_lock);
545 
546 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
547 		if ((char *)tn - tl->tl_offset == p) {
548 			*tp = tn->tn_next[t];
549 			tn->tn_next[t] = NULL;
550 			tn->tn_member[t] = 0;
551 			mutex_exit(&tl->tl_lock);
552 			return (p);
553 		}
554 	}
555 
556 	mutex_exit(&tl->tl_lock);
557 
558 	return (NULL);
559 }
560 
561 int
562 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
563 {
564 	int t = txg & TXG_MASK;
565 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
566 
567 	return (tn->tn_member[t]);
568 }
569 
570 /*
571  * Walk a txg list -- only safe if you know it's not changing.
572  */
573 void *
574 txg_list_head(txg_list_t *tl, uint64_t txg)
575 {
576 	int t = txg & TXG_MASK;
577 	txg_node_t *tn = tl->tl_head[t];
578 
579 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
580 }
581 
582 void *
583 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
584 {
585 	int t = txg & TXG_MASK;
586 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
587 
588 	tn = tn->tn_next[t];
589 
590 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
591 }
592