xref: /illumos-gate/usr/src/uts/common/fs/zfs/txg.c (revision 468c413a79615e77179e8d98f22a7e513a8135bd)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/zfs_context.h>
27 #include <sys/txg_impl.h>
28 #include <sys/dmu_impl.h>
29 #include <sys/dmu_tx.h>
30 #include <sys/dsl_pool.h>
31 #include <sys/callb.h>
32 
33 /*
34  * Pool-wide transaction groups.
35  */
36 
37 static void txg_sync_thread(dsl_pool_t *dp);
38 static void txg_quiesce_thread(dsl_pool_t *dp);
39 
40 int zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
41 
42 /*
43  * Prepare the txg subsystem.
44  */
45 void
46 txg_init(dsl_pool_t *dp, uint64_t txg)
47 {
48 	tx_state_t *tx = &dp->dp_tx;
49 	int c;
50 	bzero(tx, sizeof (tx_state_t));
51 
52 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
53 
54 	for (c = 0; c < max_ncpus; c++) {
55 		int i;
56 
57 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
58 		for (i = 0; i < TXG_SIZE; i++) {
59 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
60 			    NULL);
61 			list_create(&tx->tx_cpu[c].tc_callbacks[i],
62 			    sizeof (dmu_tx_callback_t),
63 			    offsetof(dmu_tx_callback_t, dcb_node));
64 		}
65 	}
66 
67 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
68 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
69 
70 	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
71 	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
72 	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
73 	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
74 	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
75 
76 	tx->tx_open_txg = txg;
77 }
78 
79 /*
80  * Close down the txg subsystem.
81  */
82 void
83 txg_fini(dsl_pool_t *dp)
84 {
85 	tx_state_t *tx = &dp->dp_tx;
86 	int c;
87 
88 	ASSERT(tx->tx_threads == 0);
89 
90 	rw_destroy(&tx->tx_suspend);
91 	mutex_destroy(&tx->tx_sync_lock);
92 
93 	cv_destroy(&tx->tx_sync_more_cv);
94 	cv_destroy(&tx->tx_sync_done_cv);
95 	cv_destroy(&tx->tx_quiesce_more_cv);
96 	cv_destroy(&tx->tx_quiesce_done_cv);
97 	cv_destroy(&tx->tx_exit_cv);
98 
99 	for (c = 0; c < max_ncpus; c++) {
100 		int i;
101 
102 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
103 		for (i = 0; i < TXG_SIZE; i++) {
104 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
105 			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
106 		}
107 	}
108 
109 	if (tx->tx_commit_cb_taskq != NULL)
110 		taskq_destroy(tx->tx_commit_cb_taskq);
111 
112 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
113 
114 	bzero(tx, sizeof (tx_state_t));
115 }
116 
117 /*
118  * Start syncing transaction groups.
119  */
120 void
121 txg_sync_start(dsl_pool_t *dp)
122 {
123 	tx_state_t *tx = &dp->dp_tx;
124 
125 	mutex_enter(&tx->tx_sync_lock);
126 
127 	dprintf("pool %p\n", dp);
128 
129 	ASSERT(tx->tx_threads == 0);
130 
131 	tx->tx_threads = 2;
132 
133 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
134 	    dp, 0, &p0, TS_RUN, minclsyspri);
135 
136 	/*
137 	 * The sync thread can need a larger-than-default stack size on
138 	 * 32-bit x86.  This is due in part to nested pools and
139 	 * scrub_visitbp() recursion.
140 	 */
141 	tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
142 	    dp, 0, &p0, TS_RUN, minclsyspri);
143 
144 	mutex_exit(&tx->tx_sync_lock);
145 }
146 
147 static void
148 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
149 {
150 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
151 	mutex_enter(&tx->tx_sync_lock);
152 }
153 
154 static void
155 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
156 {
157 	ASSERT(*tpp != NULL);
158 	*tpp = NULL;
159 	tx->tx_threads--;
160 	cv_broadcast(&tx->tx_exit_cv);
161 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
162 	thread_exit();
163 }
164 
165 static void
166 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
167 {
168 	CALLB_CPR_SAFE_BEGIN(cpr);
169 
170 	if (time)
171 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
172 	else
173 		cv_wait(cv, &tx->tx_sync_lock);
174 
175 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
176 }
177 
178 /*
179  * Stop syncing transaction groups.
180  */
181 void
182 txg_sync_stop(dsl_pool_t *dp)
183 {
184 	tx_state_t *tx = &dp->dp_tx;
185 
186 	dprintf("pool %p\n", dp);
187 	/*
188 	 * Finish off any work in progress.
189 	 */
190 	ASSERT(tx->tx_threads == 2);
191 
192 	/*
193 	 * We need to ensure that we've vacated the deferred space_maps.
194 	 */
195 	txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
196 
197 	/*
198 	 * Wake all sync threads and wait for them to die.
199 	 */
200 	mutex_enter(&tx->tx_sync_lock);
201 
202 	ASSERT(tx->tx_threads == 2);
203 
204 	tx->tx_exiting = 1;
205 
206 	cv_broadcast(&tx->tx_quiesce_more_cv);
207 	cv_broadcast(&tx->tx_quiesce_done_cv);
208 	cv_broadcast(&tx->tx_sync_more_cv);
209 
210 	while (tx->tx_threads != 0)
211 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
212 
213 	tx->tx_exiting = 0;
214 
215 	mutex_exit(&tx->tx_sync_lock);
216 }
217 
218 uint64_t
219 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
220 {
221 	tx_state_t *tx = &dp->dp_tx;
222 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
223 	uint64_t txg;
224 
225 	mutex_enter(&tc->tc_lock);
226 
227 	txg = tx->tx_open_txg;
228 	tc->tc_count[txg & TXG_MASK]++;
229 
230 	th->th_cpu = tc;
231 	th->th_txg = txg;
232 
233 	return (txg);
234 }
235 
236 void
237 txg_rele_to_quiesce(txg_handle_t *th)
238 {
239 	tx_cpu_t *tc = th->th_cpu;
240 
241 	mutex_exit(&tc->tc_lock);
242 }
243 
244 void
245 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
246 {
247 	tx_cpu_t *tc = th->th_cpu;
248 	int g = th->th_txg & TXG_MASK;
249 
250 	mutex_enter(&tc->tc_lock);
251 	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
252 	mutex_exit(&tc->tc_lock);
253 }
254 
255 void
256 txg_rele_to_sync(txg_handle_t *th)
257 {
258 	tx_cpu_t *tc = th->th_cpu;
259 	int g = th->th_txg & TXG_MASK;
260 
261 	mutex_enter(&tc->tc_lock);
262 	ASSERT(tc->tc_count[g] != 0);
263 	if (--tc->tc_count[g] == 0)
264 		cv_broadcast(&tc->tc_cv[g]);
265 	mutex_exit(&tc->tc_lock);
266 
267 	th->th_cpu = NULL;	/* defensive */
268 }
269 
270 static void
271 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
272 {
273 	tx_state_t *tx = &dp->dp_tx;
274 	int g = txg & TXG_MASK;
275 	int c;
276 
277 	/*
278 	 * Grab all tx_cpu locks so nobody else can get into this txg.
279 	 */
280 	for (c = 0; c < max_ncpus; c++)
281 		mutex_enter(&tx->tx_cpu[c].tc_lock);
282 
283 	ASSERT(txg == tx->tx_open_txg);
284 	tx->tx_open_txg++;
285 
286 	/*
287 	 * Now that we've incremented tx_open_txg, we can let threads
288 	 * enter the next transaction group.
289 	 */
290 	for (c = 0; c < max_ncpus; c++)
291 		mutex_exit(&tx->tx_cpu[c].tc_lock);
292 
293 	/*
294 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
295 	 */
296 	for (c = 0; c < max_ncpus; c++) {
297 		tx_cpu_t *tc = &tx->tx_cpu[c];
298 		mutex_enter(&tc->tc_lock);
299 		while (tc->tc_count[g] != 0)
300 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
301 		mutex_exit(&tc->tc_lock);
302 	}
303 }
304 
305 static void
306 txg_do_callbacks(list_t *cb_list)
307 {
308 	dmu_tx_do_callbacks(cb_list, 0);
309 
310 	list_destroy(cb_list);
311 
312 	kmem_free(cb_list, sizeof (list_t));
313 }
314 
315 /*
316  * Dispatch the commit callbacks registered on this txg to worker threads.
317  */
318 static void
319 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
320 {
321 	int c;
322 	tx_state_t *tx = &dp->dp_tx;
323 	list_t *cb_list;
324 
325 	for (c = 0; c < max_ncpus; c++) {
326 		tx_cpu_t *tc = &tx->tx_cpu[c];
327 		/* No need to lock tx_cpu_t at this point */
328 
329 		int g = txg & TXG_MASK;
330 
331 		if (list_is_empty(&tc->tc_callbacks[g]))
332 			continue;
333 
334 		if (tx->tx_commit_cb_taskq == NULL) {
335 			/*
336 			 * Commit callback taskq hasn't been created yet.
337 			 */
338 			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
339 			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
340 			    TASKQ_PREPOPULATE);
341 		}
342 
343 		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
344 		list_create(cb_list, sizeof (dmu_tx_callback_t),
345 		    offsetof(dmu_tx_callback_t, dcb_node));
346 
347 		list_move_tail(&tc->tc_callbacks[g], cb_list);
348 
349 		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
350 		    txg_do_callbacks, cb_list, TQ_SLEEP);
351 	}
352 }
353 
354 static void
355 txg_sync_thread(dsl_pool_t *dp)
356 {
357 	tx_state_t *tx = &dp->dp_tx;
358 	callb_cpr_t cpr;
359 	uint64_t start, delta;
360 
361 	txg_thread_enter(tx, &cpr);
362 
363 	start = delta = 0;
364 	for (;;) {
365 		uint64_t timer, timeout = zfs_txg_timeout * hz;
366 		uint64_t txg;
367 
368 		/*
369 		 * We sync when we're scrubbing, there's someone waiting
370 		 * on us, or the quiesce thread has handed off a txg to
371 		 * us, or we have reached our timeout.
372 		 */
373 		timer = (delta >= timeout ? 0 : timeout - delta);
374 		while ((dp->dp_scrub_func == SCRUB_FUNC_NONE ||
375 		    spa_shutting_down(dp->dp_spa)) &&
376 		    !tx->tx_exiting && timer > 0 &&
377 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
378 		    tx->tx_quiesced_txg == 0) {
379 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
380 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
381 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
382 			delta = lbolt - start;
383 			timer = (delta > timeout ? 0 : timeout - delta);
384 		}
385 
386 		/*
387 		 * Wait until the quiesce thread hands off a txg to us,
388 		 * prompting it to do so if necessary.
389 		 */
390 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
391 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
392 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
393 			cv_broadcast(&tx->tx_quiesce_more_cv);
394 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
395 		}
396 
397 		if (tx->tx_exiting)
398 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
399 
400 		rw_enter(&tx->tx_suspend, RW_WRITER);
401 
402 		/*
403 		 * Consume the quiesced txg which has been handed off to
404 		 * us.  This may cause the quiescing thread to now be
405 		 * able to quiesce another txg, so we must signal it.
406 		 */
407 		txg = tx->tx_quiesced_txg;
408 		tx->tx_quiesced_txg = 0;
409 		tx->tx_syncing_txg = txg;
410 		cv_broadcast(&tx->tx_quiesce_more_cv);
411 		rw_exit(&tx->tx_suspend);
412 
413 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
414 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
415 		mutex_exit(&tx->tx_sync_lock);
416 
417 		start = lbolt;
418 		spa_sync(dp->dp_spa, txg);
419 		delta = lbolt - start;
420 
421 		mutex_enter(&tx->tx_sync_lock);
422 		rw_enter(&tx->tx_suspend, RW_WRITER);
423 		tx->tx_synced_txg = txg;
424 		tx->tx_syncing_txg = 0;
425 		rw_exit(&tx->tx_suspend);
426 		cv_broadcast(&tx->tx_sync_done_cv);
427 
428 		/*
429 		 * Dispatch commit callbacks to worker threads.
430 		 */
431 		txg_dispatch_callbacks(dp, txg);
432 	}
433 }
434 
435 static void
436 txg_quiesce_thread(dsl_pool_t *dp)
437 {
438 	tx_state_t *tx = &dp->dp_tx;
439 	callb_cpr_t cpr;
440 
441 	txg_thread_enter(tx, &cpr);
442 
443 	for (;;) {
444 		uint64_t txg;
445 
446 		/*
447 		 * We quiesce when there's someone waiting on us.
448 		 * However, we can only have one txg in "quiescing" or
449 		 * "quiesced, waiting to sync" state.  So we wait until
450 		 * the "quiesced, waiting to sync" txg has been consumed
451 		 * by the sync thread.
452 		 */
453 		while (!tx->tx_exiting &&
454 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
455 		    tx->tx_quiesced_txg != 0))
456 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
457 
458 		if (tx->tx_exiting)
459 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
460 
461 		txg = tx->tx_open_txg;
462 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
463 		    txg, tx->tx_quiesce_txg_waiting,
464 		    tx->tx_sync_txg_waiting);
465 		mutex_exit(&tx->tx_sync_lock);
466 		txg_quiesce(dp, txg);
467 		mutex_enter(&tx->tx_sync_lock);
468 
469 		/*
470 		 * Hand this txg off to the sync thread.
471 		 */
472 		dprintf("quiesce done, handing off txg %llu\n", txg);
473 		tx->tx_quiesced_txg = txg;
474 		cv_broadcast(&tx->tx_sync_more_cv);
475 		cv_broadcast(&tx->tx_quiesce_done_cv);
476 	}
477 }
478 
479 /*
480  * Delay this thread by 'ticks' if we are still in the open transaction
481  * group and there is already a waiting txg quiesing or quiesced.  Abort
482  * the delay if this txg stalls or enters the quiesing state.
483  */
484 void
485 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
486 {
487 	tx_state_t *tx = &dp->dp_tx;
488 	int timeout = lbolt + ticks;
489 
490 	/* don't delay if this txg could transition to quiesing immediately */
491 	if (tx->tx_open_txg > txg ||
492 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
493 		return;
494 
495 	mutex_enter(&tx->tx_sync_lock);
496 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
497 		mutex_exit(&tx->tx_sync_lock);
498 		return;
499 	}
500 
501 	while (lbolt < timeout &&
502 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
503 		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
504 		    timeout);
505 
506 	mutex_exit(&tx->tx_sync_lock);
507 }
508 
509 void
510 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
511 {
512 	tx_state_t *tx = &dp->dp_tx;
513 
514 	mutex_enter(&tx->tx_sync_lock);
515 	ASSERT(tx->tx_threads == 2);
516 	if (txg == 0)
517 		txg = tx->tx_open_txg;
518 	if (tx->tx_sync_txg_waiting < txg)
519 		tx->tx_sync_txg_waiting = txg;
520 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
521 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
522 	while (tx->tx_synced_txg < txg) {
523 		dprintf("broadcasting sync more "
524 		    "tx_synced=%llu waiting=%llu dp=%p\n",
525 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
526 		cv_broadcast(&tx->tx_sync_more_cv);
527 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
528 	}
529 	mutex_exit(&tx->tx_sync_lock);
530 }
531 
532 void
533 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
534 {
535 	tx_state_t *tx = &dp->dp_tx;
536 
537 	mutex_enter(&tx->tx_sync_lock);
538 	ASSERT(tx->tx_threads == 2);
539 	if (txg == 0)
540 		txg = tx->tx_open_txg + 1;
541 	if (tx->tx_quiesce_txg_waiting < txg)
542 		tx->tx_quiesce_txg_waiting = txg;
543 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
544 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
545 	while (tx->tx_open_txg < txg) {
546 		cv_broadcast(&tx->tx_quiesce_more_cv);
547 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
548 	}
549 	mutex_exit(&tx->tx_sync_lock);
550 }
551 
552 boolean_t
553 txg_stalled(dsl_pool_t *dp)
554 {
555 	tx_state_t *tx = &dp->dp_tx;
556 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
557 }
558 
559 boolean_t
560 txg_sync_waiting(dsl_pool_t *dp)
561 {
562 	tx_state_t *tx = &dp->dp_tx;
563 
564 	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
565 	    tx->tx_quiesced_txg != 0);
566 }
567 
568 void
569 txg_suspend(dsl_pool_t *dp)
570 {
571 	tx_state_t *tx = &dp->dp_tx;
572 	/* XXX some code paths suspend when they are already suspended! */
573 	rw_enter(&tx->tx_suspend, RW_READER);
574 }
575 
576 void
577 txg_resume(dsl_pool_t *dp)
578 {
579 	tx_state_t *tx = &dp->dp_tx;
580 	rw_exit(&tx->tx_suspend);
581 }
582 
583 /*
584  * Per-txg object lists.
585  */
586 void
587 txg_list_create(txg_list_t *tl, size_t offset)
588 {
589 	int t;
590 
591 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
592 
593 	tl->tl_offset = offset;
594 
595 	for (t = 0; t < TXG_SIZE; t++)
596 		tl->tl_head[t] = NULL;
597 }
598 
599 void
600 txg_list_destroy(txg_list_t *tl)
601 {
602 	int t;
603 
604 	for (t = 0; t < TXG_SIZE; t++)
605 		ASSERT(txg_list_empty(tl, t));
606 
607 	mutex_destroy(&tl->tl_lock);
608 }
609 
610 int
611 txg_list_empty(txg_list_t *tl, uint64_t txg)
612 {
613 	return (tl->tl_head[txg & TXG_MASK] == NULL);
614 }
615 
616 /*
617  * Add an entry to the list.
618  * Returns 0 if it's a new entry, 1 if it's already there.
619  */
620 int
621 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
622 {
623 	int t = txg & TXG_MASK;
624 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
625 	int already_on_list;
626 
627 	mutex_enter(&tl->tl_lock);
628 	already_on_list = tn->tn_member[t];
629 	if (!already_on_list) {
630 		tn->tn_member[t] = 1;
631 		tn->tn_next[t] = tl->tl_head[t];
632 		tl->tl_head[t] = tn;
633 	}
634 	mutex_exit(&tl->tl_lock);
635 
636 	return (already_on_list);
637 }
638 
639 /*
640  * Remove the head of the list and return it.
641  */
642 void *
643 txg_list_remove(txg_list_t *tl, uint64_t txg)
644 {
645 	int t = txg & TXG_MASK;
646 	txg_node_t *tn;
647 	void *p = NULL;
648 
649 	mutex_enter(&tl->tl_lock);
650 	if ((tn = tl->tl_head[t]) != NULL) {
651 		p = (char *)tn - tl->tl_offset;
652 		tl->tl_head[t] = tn->tn_next[t];
653 		tn->tn_next[t] = NULL;
654 		tn->tn_member[t] = 0;
655 	}
656 	mutex_exit(&tl->tl_lock);
657 
658 	return (p);
659 }
660 
661 /*
662  * Remove a specific item from the list and return it.
663  */
664 void *
665 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
666 {
667 	int t = txg & TXG_MASK;
668 	txg_node_t *tn, **tp;
669 
670 	mutex_enter(&tl->tl_lock);
671 
672 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
673 		if ((char *)tn - tl->tl_offset == p) {
674 			*tp = tn->tn_next[t];
675 			tn->tn_next[t] = NULL;
676 			tn->tn_member[t] = 0;
677 			mutex_exit(&tl->tl_lock);
678 			return (p);
679 		}
680 	}
681 
682 	mutex_exit(&tl->tl_lock);
683 
684 	return (NULL);
685 }
686 
687 int
688 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
689 {
690 	int t = txg & TXG_MASK;
691 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
692 
693 	return (tn->tn_member[t]);
694 }
695 
696 /*
697  * Walk a txg list -- only safe if you know it's not changing.
698  */
699 void *
700 txg_list_head(txg_list_t *tl, uint64_t txg)
701 {
702 	int t = txg & TXG_MASK;
703 	txg_node_t *tn = tl->tl_head[t];
704 
705 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
706 }
707 
708 void *
709 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
710 {
711 	int t = txg & TXG_MASK;
712 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
713 
714 	tn = tn->tn_next[t];
715 
716 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
717 }
718