1fa9e4066Sahrens /* 2fa9e4066Sahrens * CDDL HEADER START 3fa9e4066Sahrens * 4fa9e4066Sahrens * The contents of this file are subject to the terms of the 55ad82045Snd * Common Development and Distribution License (the "License"). 65ad82045Snd * You may not use this file except in compliance with the License. 7fa9e4066Sahrens * 8fa9e4066Sahrens * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9fa9e4066Sahrens * or http://www.opensolaris.org/os/licensing. 10fa9e4066Sahrens * See the License for the specific language governing permissions 11fa9e4066Sahrens * and limitations under the License. 12fa9e4066Sahrens * 13fa9e4066Sahrens * When distributing Covered Code, include this CDDL HEADER in each 14fa9e4066Sahrens * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15fa9e4066Sahrens * If applicable, add the following below this CDDL HEADER, with the 16fa9e4066Sahrens * fields enclosed by brackets "[]" replaced with your own identifying 17fa9e4066Sahrens * information: Portions Copyright [yyyy] [name of copyright owner] 18fa9e4066Sahrens * 19fa9e4066Sahrens * CDDL HEADER END 20fa9e4066Sahrens */ 21fa9e4066Sahrens /* 22*8f38d419Sek * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23fa9e4066Sahrens * Use is subject to license terms. 24fa9e4066Sahrens */ 25fa9e4066Sahrens 26fa9e4066Sahrens #pragma ident "%Z%%M% %I% %E% SMI" 27fa9e4066Sahrens 28fa9e4066Sahrens #include <sys/zfs_context.h> 29fa9e4066Sahrens #include <sys/txg_impl.h> 30fa9e4066Sahrens #include <sys/dmu_impl.h> 31fa9e4066Sahrens #include <sys/dsl_pool.h> 32fa9e4066Sahrens #include <sys/callb.h> 33fa9e4066Sahrens 34fa9e4066Sahrens /* 35fa9e4066Sahrens * Pool-wide transaction groups. 36fa9e4066Sahrens */ 37fa9e4066Sahrens 38fa9e4066Sahrens static void txg_sync_thread(dsl_pool_t *dp); 39fa9e4066Sahrens static void txg_quiesce_thread(dsl_pool_t *dp); 40fa9e4066Sahrens static void txg_timelimit_thread(dsl_pool_t *dp); 41fa9e4066Sahrens 42fa9e4066Sahrens int txg_time = 5; /* max 5 seconds worth of delta per txg */ 43fa9e4066Sahrens 44fa9e4066Sahrens /* 45fa9e4066Sahrens * Prepare the txg subsystem. 46fa9e4066Sahrens */ 47fa9e4066Sahrens void 48fa9e4066Sahrens txg_init(dsl_pool_t *dp, uint64_t txg) 49fa9e4066Sahrens { 50fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 515ad82045Snd int c; 52fa9e4066Sahrens bzero(tx, sizeof (tx_state_t)); 53fa9e4066Sahrens 54fa9e4066Sahrens tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 55fa9e4066Sahrens 56*8f38d419Sek for (c = 0; c < max_ncpus; c++) { 57*8f38d419Sek int i; 58*8f38d419Sek 595ad82045Snd mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 60*8f38d419Sek for (i = 0; i < TXG_SIZE; i++) { 61*8f38d419Sek cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 62*8f38d419Sek NULL); 63*8f38d419Sek } 64*8f38d419Sek } 655ad82045Snd 66fa9e4066Sahrens rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 675ad82045Snd mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 68fa9e4066Sahrens 69fa9e4066Sahrens tx->tx_open_txg = txg; 70fa9e4066Sahrens } 71fa9e4066Sahrens 72fa9e4066Sahrens /* 73fa9e4066Sahrens * Close down the txg subsystem. 74fa9e4066Sahrens */ 75fa9e4066Sahrens void 76fa9e4066Sahrens txg_fini(dsl_pool_t *dp) 77fa9e4066Sahrens { 78fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 795ad82045Snd int c; 80fa9e4066Sahrens 81fa9e4066Sahrens ASSERT(tx->tx_threads == 0); 82fa9e4066Sahrens 83fa9e4066Sahrens rw_destroy(&tx->tx_suspend); 845ad82045Snd mutex_destroy(&tx->tx_sync_lock); 855ad82045Snd 86*8f38d419Sek for (c = 0; c < max_ncpus; c++) { 87*8f38d419Sek int i; 88*8f38d419Sek 895ad82045Snd mutex_destroy(&tx->tx_cpu[c].tc_lock); 90*8f38d419Sek for (i = 0; i < TXG_SIZE; i++) 91*8f38d419Sek cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 92*8f38d419Sek } 93fa9e4066Sahrens 94fa9e4066Sahrens kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 95fa9e4066Sahrens 96fa9e4066Sahrens bzero(tx, sizeof (tx_state_t)); 97fa9e4066Sahrens } 98fa9e4066Sahrens 99fa9e4066Sahrens /* 100fa9e4066Sahrens * Start syncing transaction groups. 101fa9e4066Sahrens */ 102fa9e4066Sahrens void 103fa9e4066Sahrens txg_sync_start(dsl_pool_t *dp) 104fa9e4066Sahrens { 105fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 106fa9e4066Sahrens 107fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 108fa9e4066Sahrens 109fa9e4066Sahrens dprintf("pool %p\n", dp); 110fa9e4066Sahrens 111fa9e4066Sahrens ASSERT(tx->tx_threads == 0); 112fa9e4066Sahrens 113fa9e4066Sahrens tx->tx_threads = 3; 114fa9e4066Sahrens 115fa9e4066Sahrens tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 116fa9e4066Sahrens dp, 0, &p0, TS_RUN, minclsyspri); 117fa9e4066Sahrens 118fa9e4066Sahrens tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread, 119fa9e4066Sahrens dp, 0, &p0, TS_RUN, minclsyspri); 120fa9e4066Sahrens 121fa9e4066Sahrens tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread, 122fa9e4066Sahrens dp, 0, &p0, TS_RUN, minclsyspri); 123fa9e4066Sahrens 124fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 125fa9e4066Sahrens } 126fa9e4066Sahrens 127fa9e4066Sahrens static void 128fa9e4066Sahrens txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 129fa9e4066Sahrens { 130fa9e4066Sahrens CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 131fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 132fa9e4066Sahrens } 133fa9e4066Sahrens 134fa9e4066Sahrens static void 135fa9e4066Sahrens txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 136fa9e4066Sahrens { 137fa9e4066Sahrens ASSERT(*tpp != NULL); 138fa9e4066Sahrens *tpp = NULL; 139fa9e4066Sahrens tx->tx_threads--; 140fa9e4066Sahrens cv_broadcast(&tx->tx_exit_cv); 141fa9e4066Sahrens CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 142fa9e4066Sahrens thread_exit(); 143fa9e4066Sahrens } 144fa9e4066Sahrens 145fa9e4066Sahrens static void 146fa9e4066Sahrens txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax) 147fa9e4066Sahrens { 148fa9e4066Sahrens CALLB_CPR_SAFE_BEGIN(cpr); 149fa9e4066Sahrens 150fa9e4066Sahrens if (secmax) 151fa9e4066Sahrens (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + secmax * hz); 152fa9e4066Sahrens else 153fa9e4066Sahrens cv_wait(cv, &tx->tx_sync_lock); 154fa9e4066Sahrens 155fa9e4066Sahrens CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 156fa9e4066Sahrens } 157fa9e4066Sahrens 158fa9e4066Sahrens /* 159fa9e4066Sahrens * Stop syncing transaction groups. 160fa9e4066Sahrens */ 161fa9e4066Sahrens void 162fa9e4066Sahrens txg_sync_stop(dsl_pool_t *dp) 163fa9e4066Sahrens { 164fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 165fa9e4066Sahrens 166fa9e4066Sahrens dprintf("pool %p\n", dp); 167fa9e4066Sahrens /* 168fa9e4066Sahrens * Finish off any work in progress. 169fa9e4066Sahrens */ 170fa9e4066Sahrens ASSERT(tx->tx_threads == 3); 171fa9e4066Sahrens txg_wait_synced(dp, 0); 172fa9e4066Sahrens 173fa9e4066Sahrens /* 174fa9e4066Sahrens * Wake all 3 sync threads (one per state) and wait for them to die. 175fa9e4066Sahrens */ 176fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 177fa9e4066Sahrens 178fa9e4066Sahrens ASSERT(tx->tx_threads == 3); 179fa9e4066Sahrens 180fa9e4066Sahrens tx->tx_exiting = 1; 181fa9e4066Sahrens 182fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 183fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_done_cv); 184fa9e4066Sahrens cv_broadcast(&tx->tx_sync_more_cv); 185fa9e4066Sahrens cv_broadcast(&tx->tx_timeout_exit_cv); 186fa9e4066Sahrens 187fa9e4066Sahrens while (tx->tx_threads != 0) 188fa9e4066Sahrens cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 189fa9e4066Sahrens 190fa9e4066Sahrens tx->tx_exiting = 0; 191fa9e4066Sahrens 192fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 193fa9e4066Sahrens } 194fa9e4066Sahrens 195fa9e4066Sahrens uint64_t 196fa9e4066Sahrens txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 197fa9e4066Sahrens { 198fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 199fa9e4066Sahrens tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 200fa9e4066Sahrens uint64_t txg; 201fa9e4066Sahrens 202fa9e4066Sahrens mutex_enter(&tc->tc_lock); 203fa9e4066Sahrens 204fa9e4066Sahrens txg = tx->tx_open_txg; 205fa9e4066Sahrens tc->tc_count[txg & TXG_MASK]++; 206fa9e4066Sahrens 207fa9e4066Sahrens th->th_cpu = tc; 208fa9e4066Sahrens th->th_txg = txg; 209fa9e4066Sahrens 210fa9e4066Sahrens return (txg); 211fa9e4066Sahrens } 212fa9e4066Sahrens 213fa9e4066Sahrens void 214fa9e4066Sahrens txg_rele_to_quiesce(txg_handle_t *th) 215fa9e4066Sahrens { 216fa9e4066Sahrens tx_cpu_t *tc = th->th_cpu; 217fa9e4066Sahrens 218fa9e4066Sahrens mutex_exit(&tc->tc_lock); 219fa9e4066Sahrens } 220fa9e4066Sahrens 221fa9e4066Sahrens void 222fa9e4066Sahrens txg_rele_to_sync(txg_handle_t *th) 223fa9e4066Sahrens { 224fa9e4066Sahrens tx_cpu_t *tc = th->th_cpu; 225fa9e4066Sahrens int g = th->th_txg & TXG_MASK; 226fa9e4066Sahrens 227fa9e4066Sahrens mutex_enter(&tc->tc_lock); 228fa9e4066Sahrens ASSERT(tc->tc_count[g] != 0); 229fa9e4066Sahrens if (--tc->tc_count[g] == 0) 230fa9e4066Sahrens cv_broadcast(&tc->tc_cv[g]); 231fa9e4066Sahrens mutex_exit(&tc->tc_lock); 232fa9e4066Sahrens 233fa9e4066Sahrens th->th_cpu = NULL; /* defensive */ 234fa9e4066Sahrens } 235fa9e4066Sahrens 236fa9e4066Sahrens static void 237fa9e4066Sahrens txg_quiesce(dsl_pool_t *dp, uint64_t txg) 238fa9e4066Sahrens { 239fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 240fa9e4066Sahrens int g = txg & TXG_MASK; 241fa9e4066Sahrens int c; 242fa9e4066Sahrens 243fa9e4066Sahrens /* 244fa9e4066Sahrens * Grab all tx_cpu locks so nobody else can get into this txg. 245fa9e4066Sahrens */ 246fa9e4066Sahrens for (c = 0; c < max_ncpus; c++) 247fa9e4066Sahrens mutex_enter(&tx->tx_cpu[c].tc_lock); 248fa9e4066Sahrens 249fa9e4066Sahrens ASSERT(txg == tx->tx_open_txg); 250fa9e4066Sahrens tx->tx_open_txg++; 251fa9e4066Sahrens 252fa9e4066Sahrens /* 253fa9e4066Sahrens * Now that we've incremented tx_open_txg, we can let threads 254fa9e4066Sahrens * enter the next transaction group. 255fa9e4066Sahrens */ 256fa9e4066Sahrens for (c = 0; c < max_ncpus; c++) 257fa9e4066Sahrens mutex_exit(&tx->tx_cpu[c].tc_lock); 258fa9e4066Sahrens 259fa9e4066Sahrens /* 260fa9e4066Sahrens * Quiesce the transaction group by waiting for everyone to txg_exit(). 261fa9e4066Sahrens */ 262fa9e4066Sahrens for (c = 0; c < max_ncpus; c++) { 263fa9e4066Sahrens tx_cpu_t *tc = &tx->tx_cpu[c]; 264fa9e4066Sahrens mutex_enter(&tc->tc_lock); 265fa9e4066Sahrens while (tc->tc_count[g] != 0) 266fa9e4066Sahrens cv_wait(&tc->tc_cv[g], &tc->tc_lock); 267fa9e4066Sahrens mutex_exit(&tc->tc_lock); 268fa9e4066Sahrens } 269fa9e4066Sahrens } 270fa9e4066Sahrens 271fa9e4066Sahrens static void 272fa9e4066Sahrens txg_sync_thread(dsl_pool_t *dp) 273fa9e4066Sahrens { 274fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 275fa9e4066Sahrens callb_cpr_t cpr; 276fa9e4066Sahrens 277fa9e4066Sahrens txg_thread_enter(tx, &cpr); 278fa9e4066Sahrens 279fa9e4066Sahrens for (;;) { 280fa9e4066Sahrens uint64_t txg; 281fa9e4066Sahrens 282fa9e4066Sahrens /* 283fa9e4066Sahrens * We sync when there's someone waiting on us, or the 284fa9e4066Sahrens * quiesce thread has handed off a txg to us. 285fa9e4066Sahrens */ 286fa9e4066Sahrens while (!tx->tx_exiting && 287fa9e4066Sahrens tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 288fa9e4066Sahrens tx->tx_quiesced_txg == 0) { 289fa9e4066Sahrens dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 290fa9e4066Sahrens tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 291fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0); 292fa9e4066Sahrens } 293fa9e4066Sahrens 294fa9e4066Sahrens /* 295fa9e4066Sahrens * Wait until the quiesce thread hands off a txg to us, 296fa9e4066Sahrens * prompting it to do so if necessary. 297fa9e4066Sahrens */ 298fa9e4066Sahrens while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 299fa9e4066Sahrens if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 300fa9e4066Sahrens tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 301fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 302fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 303fa9e4066Sahrens } 304fa9e4066Sahrens 305fa9e4066Sahrens if (tx->tx_exiting) 306fa9e4066Sahrens txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 307fa9e4066Sahrens 308fa9e4066Sahrens rw_enter(&tx->tx_suspend, RW_WRITER); 309fa9e4066Sahrens 310fa9e4066Sahrens /* 311fa9e4066Sahrens * Consume the quiesced txg which has been handed off to 312fa9e4066Sahrens * us. This may cause the quiescing thread to now be 313fa9e4066Sahrens * able to quiesce another txg, so we must signal it. 314fa9e4066Sahrens */ 315fa9e4066Sahrens txg = tx->tx_quiesced_txg; 316fa9e4066Sahrens tx->tx_quiesced_txg = 0; 317fa9e4066Sahrens tx->tx_syncing_txg = txg; 318fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 319fa9e4066Sahrens rw_exit(&tx->tx_suspend); 320fa9e4066Sahrens 321fa9e4066Sahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 322*8f38d419Sek txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 323fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 324fa9e4066Sahrens spa_sync(dp->dp_spa, txg); 325fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 326fa9e4066Sahrens rw_enter(&tx->tx_suspend, RW_WRITER); 327fa9e4066Sahrens tx->tx_synced_txg = txg; 328fa9e4066Sahrens tx->tx_syncing_txg = 0; 329fa9e4066Sahrens rw_exit(&tx->tx_suspend); 330fa9e4066Sahrens cv_broadcast(&tx->tx_sync_done_cv); 331fa9e4066Sahrens } 332fa9e4066Sahrens } 333fa9e4066Sahrens 334fa9e4066Sahrens static void 335fa9e4066Sahrens txg_quiesce_thread(dsl_pool_t *dp) 336fa9e4066Sahrens { 337fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 338fa9e4066Sahrens callb_cpr_t cpr; 339fa9e4066Sahrens 340fa9e4066Sahrens txg_thread_enter(tx, &cpr); 341fa9e4066Sahrens 342fa9e4066Sahrens for (;;) { 343fa9e4066Sahrens uint64_t txg; 344fa9e4066Sahrens 345fa9e4066Sahrens /* 346fa9e4066Sahrens * We quiesce when there's someone waiting on us. 347fa9e4066Sahrens * However, we can only have one txg in "quiescing" or 348fa9e4066Sahrens * "quiesced, waiting to sync" state. So we wait until 349fa9e4066Sahrens * the "quiesced, waiting to sync" txg has been consumed 350fa9e4066Sahrens * by the sync thread. 351fa9e4066Sahrens */ 352fa9e4066Sahrens while (!tx->tx_exiting && 353fa9e4066Sahrens (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 354fa9e4066Sahrens tx->tx_quiesced_txg != 0)) 355fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 356fa9e4066Sahrens 357fa9e4066Sahrens if (tx->tx_exiting) 358fa9e4066Sahrens txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 359fa9e4066Sahrens 360fa9e4066Sahrens txg = tx->tx_open_txg; 361fa9e4066Sahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 362fa9e4066Sahrens txg, tx->tx_quiesce_txg_waiting, 363fa9e4066Sahrens tx->tx_sync_txg_waiting); 364fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 365fa9e4066Sahrens txg_quiesce(dp, txg); 366fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 367fa9e4066Sahrens 368fa9e4066Sahrens /* 369fa9e4066Sahrens * Hand this txg off to the sync thread. 370fa9e4066Sahrens */ 371fa9e4066Sahrens dprintf("quiesce done, handing off txg %llu\n", txg); 372fa9e4066Sahrens tx->tx_quiesced_txg = txg; 373fa9e4066Sahrens cv_broadcast(&tx->tx_sync_more_cv); 374fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_done_cv); 375fa9e4066Sahrens } 376fa9e4066Sahrens } 377fa9e4066Sahrens 378fa9e4066Sahrens void 379fa9e4066Sahrens txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 380fa9e4066Sahrens { 381fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 382fa9e4066Sahrens 383fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 384fa9e4066Sahrens ASSERT(tx->tx_threads == 3); 385fa9e4066Sahrens if (txg == 0) 386fa9e4066Sahrens txg = tx->tx_open_txg; 387fa9e4066Sahrens if (tx->tx_sync_txg_waiting < txg) 388fa9e4066Sahrens tx->tx_sync_txg_waiting = txg; 389fa9e4066Sahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 390fa9e4066Sahrens txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 391fa9e4066Sahrens while (tx->tx_synced_txg < txg) { 392fa9e4066Sahrens dprintf("broadcasting sync more " 393fa9e4066Sahrens "tx_synced=%llu waiting=%llu dp=%p\n", 394fa9e4066Sahrens tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 395fa9e4066Sahrens cv_broadcast(&tx->tx_sync_more_cv); 396fa9e4066Sahrens cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 397fa9e4066Sahrens } 398fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 399fa9e4066Sahrens } 400fa9e4066Sahrens 401fa9e4066Sahrens void 402fa9e4066Sahrens txg_wait_open(dsl_pool_t *dp, uint64_t txg) 403fa9e4066Sahrens { 404fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 405fa9e4066Sahrens 406fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 407fa9e4066Sahrens ASSERT(tx->tx_threads == 3); 408fa9e4066Sahrens if (txg == 0) 409fa9e4066Sahrens txg = tx->tx_open_txg + 1; 410fa9e4066Sahrens if (tx->tx_quiesce_txg_waiting < txg) 411fa9e4066Sahrens tx->tx_quiesce_txg_waiting = txg; 412fa9e4066Sahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 413fa9e4066Sahrens txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 414fa9e4066Sahrens while (tx->tx_open_txg < txg) { 415fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 416fa9e4066Sahrens cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 417fa9e4066Sahrens } 418fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 419fa9e4066Sahrens } 420fa9e4066Sahrens 421fa9e4066Sahrens static void 422fa9e4066Sahrens txg_timelimit_thread(dsl_pool_t *dp) 423fa9e4066Sahrens { 424fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 425fa9e4066Sahrens callb_cpr_t cpr; 426fa9e4066Sahrens 427fa9e4066Sahrens txg_thread_enter(tx, &cpr); 428fa9e4066Sahrens 429fa9e4066Sahrens while (!tx->tx_exiting) { 430fa9e4066Sahrens uint64_t txg = tx->tx_open_txg + 1; 431fa9e4066Sahrens 432fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time); 433fa9e4066Sahrens 434fa9e4066Sahrens if (tx->tx_quiesce_txg_waiting < txg) 435fa9e4066Sahrens tx->tx_quiesce_txg_waiting = txg; 436fa9e4066Sahrens 437fa9e4066Sahrens while (!tx->tx_exiting && tx->tx_open_txg < txg) { 438fa9e4066Sahrens dprintf("pushing out %llu\n", txg); 439fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 440fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 441fa9e4066Sahrens } 442fa9e4066Sahrens } 443fa9e4066Sahrens txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread); 444fa9e4066Sahrens } 445fa9e4066Sahrens 446fa9e4066Sahrens int 447fa9e4066Sahrens txg_stalled(dsl_pool_t *dp) 448fa9e4066Sahrens { 449fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 450fa9e4066Sahrens return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 451fa9e4066Sahrens } 452fa9e4066Sahrens 453fa9e4066Sahrens void 454fa9e4066Sahrens txg_suspend(dsl_pool_t *dp) 455fa9e4066Sahrens { 456fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 457fa9e4066Sahrens /* XXX some code paths suspend when they are already suspended! */ 458fa9e4066Sahrens rw_enter(&tx->tx_suspend, RW_READER); 459fa9e4066Sahrens } 460fa9e4066Sahrens 461fa9e4066Sahrens void 462fa9e4066Sahrens txg_resume(dsl_pool_t *dp) 463fa9e4066Sahrens { 464fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 465fa9e4066Sahrens rw_exit(&tx->tx_suspend); 466fa9e4066Sahrens } 467fa9e4066Sahrens 468fa9e4066Sahrens /* 469fa9e4066Sahrens * Per-txg object lists. 470fa9e4066Sahrens */ 471fa9e4066Sahrens void 472fa9e4066Sahrens txg_list_create(txg_list_t *tl, size_t offset) 473fa9e4066Sahrens { 474fa9e4066Sahrens int t; 475fa9e4066Sahrens 476fa9e4066Sahrens mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 477fa9e4066Sahrens 478fa9e4066Sahrens tl->tl_offset = offset; 479fa9e4066Sahrens 480fa9e4066Sahrens for (t = 0; t < TXG_SIZE; t++) 481fa9e4066Sahrens tl->tl_head[t] = NULL; 482fa9e4066Sahrens } 483fa9e4066Sahrens 484fa9e4066Sahrens void 485fa9e4066Sahrens txg_list_destroy(txg_list_t *tl) 486fa9e4066Sahrens { 487fa9e4066Sahrens int t; 488fa9e4066Sahrens 489fa9e4066Sahrens for (t = 0; t < TXG_SIZE; t++) 490fa9e4066Sahrens ASSERT(txg_list_empty(tl, t)); 491fa9e4066Sahrens 492fa9e4066Sahrens mutex_destroy(&tl->tl_lock); 493fa9e4066Sahrens } 494fa9e4066Sahrens 495fa9e4066Sahrens int 496fa9e4066Sahrens txg_list_empty(txg_list_t *tl, uint64_t txg) 497fa9e4066Sahrens { 498fa9e4066Sahrens return (tl->tl_head[txg & TXG_MASK] == NULL); 499fa9e4066Sahrens } 500fa9e4066Sahrens 501fa9e4066Sahrens /* 502fa9e4066Sahrens * Add an entry to the list. 503fa9e4066Sahrens * Returns 0 if it's a new entry, 1 if it's already there. 504fa9e4066Sahrens */ 505fa9e4066Sahrens int 506fa9e4066Sahrens txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 507fa9e4066Sahrens { 508fa9e4066Sahrens int t = txg & TXG_MASK; 509fa9e4066Sahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 510fa9e4066Sahrens int already_on_list; 511fa9e4066Sahrens 512fa9e4066Sahrens mutex_enter(&tl->tl_lock); 513fa9e4066Sahrens already_on_list = tn->tn_member[t]; 514fa9e4066Sahrens if (!already_on_list) { 515fa9e4066Sahrens tn->tn_member[t] = 1; 516fa9e4066Sahrens tn->tn_next[t] = tl->tl_head[t]; 517fa9e4066Sahrens tl->tl_head[t] = tn; 518fa9e4066Sahrens } 519fa9e4066Sahrens mutex_exit(&tl->tl_lock); 520fa9e4066Sahrens 521fa9e4066Sahrens return (already_on_list); 522fa9e4066Sahrens } 523fa9e4066Sahrens 524fa9e4066Sahrens /* 525fa9e4066Sahrens * Remove the head of the list and return it. 526fa9e4066Sahrens */ 527fa9e4066Sahrens void * 528fa9e4066Sahrens txg_list_remove(txg_list_t *tl, uint64_t txg) 529fa9e4066Sahrens { 530fa9e4066Sahrens int t = txg & TXG_MASK; 531fa9e4066Sahrens txg_node_t *tn; 532fa9e4066Sahrens void *p = NULL; 533fa9e4066Sahrens 534fa9e4066Sahrens mutex_enter(&tl->tl_lock); 535fa9e4066Sahrens if ((tn = tl->tl_head[t]) != NULL) { 536fa9e4066Sahrens p = (char *)tn - tl->tl_offset; 537fa9e4066Sahrens tl->tl_head[t] = tn->tn_next[t]; 538fa9e4066Sahrens tn->tn_next[t] = NULL; 539fa9e4066Sahrens tn->tn_member[t] = 0; 540fa9e4066Sahrens } 541fa9e4066Sahrens mutex_exit(&tl->tl_lock); 542fa9e4066Sahrens 543fa9e4066Sahrens return (p); 544fa9e4066Sahrens } 545fa9e4066Sahrens 546fa9e4066Sahrens /* 547fa9e4066Sahrens * Remove a specific item from the list and return it. 548fa9e4066Sahrens */ 549fa9e4066Sahrens void * 550fa9e4066Sahrens txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 551fa9e4066Sahrens { 552fa9e4066Sahrens int t = txg & TXG_MASK; 553fa9e4066Sahrens txg_node_t *tn, **tp; 554fa9e4066Sahrens 555fa9e4066Sahrens mutex_enter(&tl->tl_lock); 556fa9e4066Sahrens 557fa9e4066Sahrens for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 558fa9e4066Sahrens if ((char *)tn - tl->tl_offset == p) { 559fa9e4066Sahrens *tp = tn->tn_next[t]; 560fa9e4066Sahrens tn->tn_next[t] = NULL; 561fa9e4066Sahrens tn->tn_member[t] = 0; 562fa9e4066Sahrens mutex_exit(&tl->tl_lock); 563fa9e4066Sahrens return (p); 564fa9e4066Sahrens } 565fa9e4066Sahrens } 566fa9e4066Sahrens 567fa9e4066Sahrens mutex_exit(&tl->tl_lock); 568fa9e4066Sahrens 569fa9e4066Sahrens return (NULL); 570fa9e4066Sahrens } 571fa9e4066Sahrens 572fa9e4066Sahrens int 573fa9e4066Sahrens txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 574fa9e4066Sahrens { 575fa9e4066Sahrens int t = txg & TXG_MASK; 576fa9e4066Sahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 577fa9e4066Sahrens 578fa9e4066Sahrens return (tn->tn_member[t]); 579fa9e4066Sahrens } 580fa9e4066Sahrens 581fa9e4066Sahrens /* 582fa9e4066Sahrens * Walk a txg list -- only safe if you know it's not changing. 583fa9e4066Sahrens */ 584fa9e4066Sahrens void * 585fa9e4066Sahrens txg_list_head(txg_list_t *tl, uint64_t txg) 586fa9e4066Sahrens { 587fa9e4066Sahrens int t = txg & TXG_MASK; 588fa9e4066Sahrens txg_node_t *tn = tl->tl_head[t]; 589fa9e4066Sahrens 590fa9e4066Sahrens return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 591fa9e4066Sahrens } 592fa9e4066Sahrens 593fa9e4066Sahrens void * 594fa9e4066Sahrens txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 595fa9e4066Sahrens { 596fa9e4066Sahrens int t = txg & TXG_MASK; 597fa9e4066Sahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 598fa9e4066Sahrens 599fa9e4066Sahrens tn = tn->tn_next[t]; 600fa9e4066Sahrens 601fa9e4066Sahrens return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 602fa9e4066Sahrens } 603