1*fa9e4066Sahrens /* 2*fa9e4066Sahrens * CDDL HEADER START 3*fa9e4066Sahrens * 4*fa9e4066Sahrens * The contents of this file are subject to the terms of the 5*fa9e4066Sahrens * Common Development and Distribution License, Version 1.0 only 6*fa9e4066Sahrens * (the "License"). You may not use this file except in compliance 7*fa9e4066Sahrens * with the License. 8*fa9e4066Sahrens * 9*fa9e4066Sahrens * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10*fa9e4066Sahrens * or http://www.opensolaris.org/os/licensing. 11*fa9e4066Sahrens * See the License for the specific language governing permissions 12*fa9e4066Sahrens * and limitations under the License. 13*fa9e4066Sahrens * 14*fa9e4066Sahrens * When distributing Covered Code, include this CDDL HEADER in each 15*fa9e4066Sahrens * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16*fa9e4066Sahrens * If applicable, add the following below this CDDL HEADER, with the 17*fa9e4066Sahrens * fields enclosed by brackets "[]" replaced with your own identifying 18*fa9e4066Sahrens * information: Portions Copyright [yyyy] [name of copyright owner] 19*fa9e4066Sahrens * 20*fa9e4066Sahrens * CDDL HEADER END 21*fa9e4066Sahrens */ 22*fa9e4066Sahrens /* 23*fa9e4066Sahrens * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 24*fa9e4066Sahrens * Use is subject to license terms. 25*fa9e4066Sahrens */ 26*fa9e4066Sahrens 27*fa9e4066Sahrens #pragma ident "%Z%%M% %I% %E% SMI" 28*fa9e4066Sahrens 29*fa9e4066Sahrens #include <sys/zfs_context.h> 30*fa9e4066Sahrens #include <sys/txg_impl.h> 31*fa9e4066Sahrens #include <sys/dmu_impl.h> 32*fa9e4066Sahrens #include <sys/dsl_pool.h> 33*fa9e4066Sahrens #include <sys/callb.h> 34*fa9e4066Sahrens 35*fa9e4066Sahrens /* 36*fa9e4066Sahrens * Pool-wide transaction groups. 37*fa9e4066Sahrens */ 38*fa9e4066Sahrens 39*fa9e4066Sahrens static void txg_sync_thread(dsl_pool_t *dp); 40*fa9e4066Sahrens static void txg_quiesce_thread(dsl_pool_t *dp); 41*fa9e4066Sahrens static void txg_timelimit_thread(dsl_pool_t *dp); 42*fa9e4066Sahrens 43*fa9e4066Sahrens int txg_time = 5; /* max 5 seconds worth of delta per txg */ 44*fa9e4066Sahrens 45*fa9e4066Sahrens /* 46*fa9e4066Sahrens * Prepare the txg subsystem. 47*fa9e4066Sahrens */ 48*fa9e4066Sahrens void 49*fa9e4066Sahrens txg_init(dsl_pool_t *dp, uint64_t txg) 50*fa9e4066Sahrens { 51*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 52*fa9e4066Sahrens 53*fa9e4066Sahrens bzero(tx, sizeof (tx_state_t)); 54*fa9e4066Sahrens 55*fa9e4066Sahrens tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 56*fa9e4066Sahrens 57*fa9e4066Sahrens rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 58*fa9e4066Sahrens 59*fa9e4066Sahrens tx->tx_open_txg = txg; 60*fa9e4066Sahrens } 61*fa9e4066Sahrens 62*fa9e4066Sahrens /* 63*fa9e4066Sahrens * Close down the txg subsystem. 64*fa9e4066Sahrens */ 65*fa9e4066Sahrens void 66*fa9e4066Sahrens txg_fini(dsl_pool_t *dp) 67*fa9e4066Sahrens { 68*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 69*fa9e4066Sahrens 70*fa9e4066Sahrens ASSERT(tx->tx_threads == 0); 71*fa9e4066Sahrens 72*fa9e4066Sahrens rw_destroy(&tx->tx_suspend); 73*fa9e4066Sahrens 74*fa9e4066Sahrens kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 75*fa9e4066Sahrens 76*fa9e4066Sahrens bzero(tx, sizeof (tx_state_t)); 77*fa9e4066Sahrens } 78*fa9e4066Sahrens 79*fa9e4066Sahrens /* 80*fa9e4066Sahrens * Start syncing transaction groups. 81*fa9e4066Sahrens */ 82*fa9e4066Sahrens void 83*fa9e4066Sahrens txg_sync_start(dsl_pool_t *dp) 84*fa9e4066Sahrens { 85*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 86*fa9e4066Sahrens 87*fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 88*fa9e4066Sahrens 89*fa9e4066Sahrens dprintf("pool %p\n", dp); 90*fa9e4066Sahrens 91*fa9e4066Sahrens ASSERT(tx->tx_threads == 0); 92*fa9e4066Sahrens 93*fa9e4066Sahrens tx->tx_threads = 3; 94*fa9e4066Sahrens 95*fa9e4066Sahrens tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 96*fa9e4066Sahrens dp, 0, &p0, TS_RUN, minclsyspri); 97*fa9e4066Sahrens 98*fa9e4066Sahrens tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread, 99*fa9e4066Sahrens dp, 0, &p0, TS_RUN, minclsyspri); 100*fa9e4066Sahrens 101*fa9e4066Sahrens tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread, 102*fa9e4066Sahrens dp, 0, &p0, TS_RUN, minclsyspri); 103*fa9e4066Sahrens 104*fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 105*fa9e4066Sahrens } 106*fa9e4066Sahrens 107*fa9e4066Sahrens static void 108*fa9e4066Sahrens txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 109*fa9e4066Sahrens { 110*fa9e4066Sahrens CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 111*fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 112*fa9e4066Sahrens } 113*fa9e4066Sahrens 114*fa9e4066Sahrens static void 115*fa9e4066Sahrens txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 116*fa9e4066Sahrens { 117*fa9e4066Sahrens ASSERT(*tpp != NULL); 118*fa9e4066Sahrens *tpp = NULL; 119*fa9e4066Sahrens tx->tx_threads--; 120*fa9e4066Sahrens cv_broadcast(&tx->tx_exit_cv); 121*fa9e4066Sahrens CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 122*fa9e4066Sahrens thread_exit(); 123*fa9e4066Sahrens } 124*fa9e4066Sahrens 125*fa9e4066Sahrens static void 126*fa9e4066Sahrens txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax) 127*fa9e4066Sahrens { 128*fa9e4066Sahrens CALLB_CPR_SAFE_BEGIN(cpr); 129*fa9e4066Sahrens 130*fa9e4066Sahrens if (secmax) 131*fa9e4066Sahrens (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + secmax * hz); 132*fa9e4066Sahrens else 133*fa9e4066Sahrens cv_wait(cv, &tx->tx_sync_lock); 134*fa9e4066Sahrens 135*fa9e4066Sahrens CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 136*fa9e4066Sahrens } 137*fa9e4066Sahrens 138*fa9e4066Sahrens /* 139*fa9e4066Sahrens * Stop syncing transaction groups. 140*fa9e4066Sahrens */ 141*fa9e4066Sahrens void 142*fa9e4066Sahrens txg_sync_stop(dsl_pool_t *dp) 143*fa9e4066Sahrens { 144*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 145*fa9e4066Sahrens 146*fa9e4066Sahrens dprintf("pool %p\n", dp); 147*fa9e4066Sahrens /* 148*fa9e4066Sahrens * Finish off any work in progress. 149*fa9e4066Sahrens */ 150*fa9e4066Sahrens ASSERT(tx->tx_threads == 3); 151*fa9e4066Sahrens txg_wait_synced(dp, 0); 152*fa9e4066Sahrens 153*fa9e4066Sahrens /* 154*fa9e4066Sahrens * Wake all 3 sync threads (one per state) and wait for them to die. 155*fa9e4066Sahrens */ 156*fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 157*fa9e4066Sahrens 158*fa9e4066Sahrens ASSERT(tx->tx_threads == 3); 159*fa9e4066Sahrens 160*fa9e4066Sahrens tx->tx_exiting = 1; 161*fa9e4066Sahrens 162*fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 163*fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_done_cv); 164*fa9e4066Sahrens cv_broadcast(&tx->tx_sync_more_cv); 165*fa9e4066Sahrens cv_broadcast(&tx->tx_timeout_exit_cv); 166*fa9e4066Sahrens 167*fa9e4066Sahrens while (tx->tx_threads != 0) 168*fa9e4066Sahrens cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 169*fa9e4066Sahrens 170*fa9e4066Sahrens tx->tx_exiting = 0; 171*fa9e4066Sahrens 172*fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 173*fa9e4066Sahrens } 174*fa9e4066Sahrens 175*fa9e4066Sahrens uint64_t 176*fa9e4066Sahrens txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 177*fa9e4066Sahrens { 178*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 179*fa9e4066Sahrens tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 180*fa9e4066Sahrens uint64_t txg; 181*fa9e4066Sahrens 182*fa9e4066Sahrens mutex_enter(&tc->tc_lock); 183*fa9e4066Sahrens 184*fa9e4066Sahrens txg = tx->tx_open_txg; 185*fa9e4066Sahrens tc->tc_count[txg & TXG_MASK]++; 186*fa9e4066Sahrens 187*fa9e4066Sahrens th->th_cpu = tc; 188*fa9e4066Sahrens th->th_txg = txg; 189*fa9e4066Sahrens 190*fa9e4066Sahrens return (txg); 191*fa9e4066Sahrens } 192*fa9e4066Sahrens 193*fa9e4066Sahrens void 194*fa9e4066Sahrens txg_rele_to_quiesce(txg_handle_t *th) 195*fa9e4066Sahrens { 196*fa9e4066Sahrens tx_cpu_t *tc = th->th_cpu; 197*fa9e4066Sahrens 198*fa9e4066Sahrens mutex_exit(&tc->tc_lock); 199*fa9e4066Sahrens } 200*fa9e4066Sahrens 201*fa9e4066Sahrens void 202*fa9e4066Sahrens txg_rele_to_sync(txg_handle_t *th) 203*fa9e4066Sahrens { 204*fa9e4066Sahrens tx_cpu_t *tc = th->th_cpu; 205*fa9e4066Sahrens int g = th->th_txg & TXG_MASK; 206*fa9e4066Sahrens 207*fa9e4066Sahrens mutex_enter(&tc->tc_lock); 208*fa9e4066Sahrens ASSERT(tc->tc_count[g] != 0); 209*fa9e4066Sahrens if (--tc->tc_count[g] == 0) 210*fa9e4066Sahrens cv_broadcast(&tc->tc_cv[g]); 211*fa9e4066Sahrens mutex_exit(&tc->tc_lock); 212*fa9e4066Sahrens 213*fa9e4066Sahrens th->th_cpu = NULL; /* defensive */ 214*fa9e4066Sahrens } 215*fa9e4066Sahrens 216*fa9e4066Sahrens static void 217*fa9e4066Sahrens txg_quiesce(dsl_pool_t *dp, uint64_t txg) 218*fa9e4066Sahrens { 219*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 220*fa9e4066Sahrens int g = txg & TXG_MASK; 221*fa9e4066Sahrens int c; 222*fa9e4066Sahrens 223*fa9e4066Sahrens /* 224*fa9e4066Sahrens * Grab all tx_cpu locks so nobody else can get into this txg. 225*fa9e4066Sahrens */ 226*fa9e4066Sahrens for (c = 0; c < max_ncpus; c++) 227*fa9e4066Sahrens mutex_enter(&tx->tx_cpu[c].tc_lock); 228*fa9e4066Sahrens 229*fa9e4066Sahrens ASSERT(txg == tx->tx_open_txg); 230*fa9e4066Sahrens tx->tx_open_txg++; 231*fa9e4066Sahrens 232*fa9e4066Sahrens /* 233*fa9e4066Sahrens * Now that we've incremented tx_open_txg, we can let threads 234*fa9e4066Sahrens * enter the next transaction group. 235*fa9e4066Sahrens */ 236*fa9e4066Sahrens for (c = 0; c < max_ncpus; c++) 237*fa9e4066Sahrens mutex_exit(&tx->tx_cpu[c].tc_lock); 238*fa9e4066Sahrens 239*fa9e4066Sahrens /* 240*fa9e4066Sahrens * Quiesce the transaction group by waiting for everyone to txg_exit(). 241*fa9e4066Sahrens */ 242*fa9e4066Sahrens for (c = 0; c < max_ncpus; c++) { 243*fa9e4066Sahrens tx_cpu_t *tc = &tx->tx_cpu[c]; 244*fa9e4066Sahrens mutex_enter(&tc->tc_lock); 245*fa9e4066Sahrens while (tc->tc_count[g] != 0) 246*fa9e4066Sahrens cv_wait(&tc->tc_cv[g], &tc->tc_lock); 247*fa9e4066Sahrens mutex_exit(&tc->tc_lock); 248*fa9e4066Sahrens } 249*fa9e4066Sahrens } 250*fa9e4066Sahrens 251*fa9e4066Sahrens static void 252*fa9e4066Sahrens txg_sync_thread(dsl_pool_t *dp) 253*fa9e4066Sahrens { 254*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 255*fa9e4066Sahrens callb_cpr_t cpr; 256*fa9e4066Sahrens 257*fa9e4066Sahrens txg_thread_enter(tx, &cpr); 258*fa9e4066Sahrens 259*fa9e4066Sahrens for (;;) { 260*fa9e4066Sahrens uint64_t txg; 261*fa9e4066Sahrens 262*fa9e4066Sahrens /* 263*fa9e4066Sahrens * We sync when there's someone waiting on us, or the 264*fa9e4066Sahrens * quiesce thread has handed off a txg to us. 265*fa9e4066Sahrens */ 266*fa9e4066Sahrens while (!tx->tx_exiting && 267*fa9e4066Sahrens tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 268*fa9e4066Sahrens tx->tx_quiesced_txg == 0) { 269*fa9e4066Sahrens dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 270*fa9e4066Sahrens tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 271*fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0); 272*fa9e4066Sahrens } 273*fa9e4066Sahrens 274*fa9e4066Sahrens /* 275*fa9e4066Sahrens * Wait until the quiesce thread hands off a txg to us, 276*fa9e4066Sahrens * prompting it to do so if necessary. 277*fa9e4066Sahrens */ 278*fa9e4066Sahrens while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 279*fa9e4066Sahrens if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 280*fa9e4066Sahrens tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 281*fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 282*fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 283*fa9e4066Sahrens } 284*fa9e4066Sahrens 285*fa9e4066Sahrens if (tx->tx_exiting) 286*fa9e4066Sahrens txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 287*fa9e4066Sahrens 288*fa9e4066Sahrens rw_enter(&tx->tx_suspend, RW_WRITER); 289*fa9e4066Sahrens 290*fa9e4066Sahrens /* 291*fa9e4066Sahrens * Consume the quiesced txg which has been handed off to 292*fa9e4066Sahrens * us. This may cause the quiescing thread to now be 293*fa9e4066Sahrens * able to quiesce another txg, so we must signal it. 294*fa9e4066Sahrens */ 295*fa9e4066Sahrens txg = tx->tx_quiesced_txg; 296*fa9e4066Sahrens tx->tx_quiesced_txg = 0; 297*fa9e4066Sahrens tx->tx_syncing_txg = txg; 298*fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 299*fa9e4066Sahrens rw_exit(&tx->tx_suspend); 300*fa9e4066Sahrens 301*fa9e4066Sahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 302*fa9e4066Sahrens txg, tx->tx_quiesce_txg_waiting, 303*fa9e4066Sahrens tx->tx_sync_txg_waiting); 304*fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 305*fa9e4066Sahrens spa_sync(dp->dp_spa, txg); 306*fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 307*fa9e4066Sahrens rw_enter(&tx->tx_suspend, RW_WRITER); 308*fa9e4066Sahrens tx->tx_synced_txg = txg; 309*fa9e4066Sahrens tx->tx_syncing_txg = 0; 310*fa9e4066Sahrens rw_exit(&tx->tx_suspend); 311*fa9e4066Sahrens cv_broadcast(&tx->tx_sync_done_cv); 312*fa9e4066Sahrens } 313*fa9e4066Sahrens } 314*fa9e4066Sahrens 315*fa9e4066Sahrens static void 316*fa9e4066Sahrens txg_quiesce_thread(dsl_pool_t *dp) 317*fa9e4066Sahrens { 318*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 319*fa9e4066Sahrens callb_cpr_t cpr; 320*fa9e4066Sahrens 321*fa9e4066Sahrens txg_thread_enter(tx, &cpr); 322*fa9e4066Sahrens 323*fa9e4066Sahrens for (;;) { 324*fa9e4066Sahrens uint64_t txg; 325*fa9e4066Sahrens 326*fa9e4066Sahrens /* 327*fa9e4066Sahrens * We quiesce when there's someone waiting on us. 328*fa9e4066Sahrens * However, we can only have one txg in "quiescing" or 329*fa9e4066Sahrens * "quiesced, waiting to sync" state. So we wait until 330*fa9e4066Sahrens * the "quiesced, waiting to sync" txg has been consumed 331*fa9e4066Sahrens * by the sync thread. 332*fa9e4066Sahrens */ 333*fa9e4066Sahrens while (!tx->tx_exiting && 334*fa9e4066Sahrens (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 335*fa9e4066Sahrens tx->tx_quiesced_txg != 0)) 336*fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 337*fa9e4066Sahrens 338*fa9e4066Sahrens if (tx->tx_exiting) 339*fa9e4066Sahrens txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 340*fa9e4066Sahrens 341*fa9e4066Sahrens txg = tx->tx_open_txg; 342*fa9e4066Sahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 343*fa9e4066Sahrens txg, tx->tx_quiesce_txg_waiting, 344*fa9e4066Sahrens tx->tx_sync_txg_waiting); 345*fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 346*fa9e4066Sahrens txg_quiesce(dp, txg); 347*fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 348*fa9e4066Sahrens 349*fa9e4066Sahrens /* 350*fa9e4066Sahrens * Hand this txg off to the sync thread. 351*fa9e4066Sahrens */ 352*fa9e4066Sahrens dprintf("quiesce done, handing off txg %llu\n", txg); 353*fa9e4066Sahrens tx->tx_quiesced_txg = txg; 354*fa9e4066Sahrens cv_broadcast(&tx->tx_sync_more_cv); 355*fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_done_cv); 356*fa9e4066Sahrens } 357*fa9e4066Sahrens } 358*fa9e4066Sahrens 359*fa9e4066Sahrens void 360*fa9e4066Sahrens txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 361*fa9e4066Sahrens { 362*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 363*fa9e4066Sahrens 364*fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 365*fa9e4066Sahrens ASSERT(tx->tx_threads == 3); 366*fa9e4066Sahrens if (txg == 0) 367*fa9e4066Sahrens txg = tx->tx_open_txg; 368*fa9e4066Sahrens if (tx->tx_sync_txg_waiting < txg) 369*fa9e4066Sahrens tx->tx_sync_txg_waiting = txg; 370*fa9e4066Sahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 371*fa9e4066Sahrens txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 372*fa9e4066Sahrens while (tx->tx_synced_txg < txg) { 373*fa9e4066Sahrens dprintf("broadcasting sync more " 374*fa9e4066Sahrens "tx_synced=%llu waiting=%llu dp=%p\n", 375*fa9e4066Sahrens tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 376*fa9e4066Sahrens cv_broadcast(&tx->tx_sync_more_cv); 377*fa9e4066Sahrens cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 378*fa9e4066Sahrens } 379*fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 380*fa9e4066Sahrens } 381*fa9e4066Sahrens 382*fa9e4066Sahrens void 383*fa9e4066Sahrens txg_wait_open(dsl_pool_t *dp, uint64_t txg) 384*fa9e4066Sahrens { 385*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 386*fa9e4066Sahrens 387*fa9e4066Sahrens mutex_enter(&tx->tx_sync_lock); 388*fa9e4066Sahrens ASSERT(tx->tx_threads == 3); 389*fa9e4066Sahrens if (txg == 0) 390*fa9e4066Sahrens txg = tx->tx_open_txg + 1; 391*fa9e4066Sahrens if (tx->tx_quiesce_txg_waiting < txg) 392*fa9e4066Sahrens tx->tx_quiesce_txg_waiting = txg; 393*fa9e4066Sahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 394*fa9e4066Sahrens txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 395*fa9e4066Sahrens while (tx->tx_open_txg < txg) { 396*fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 397*fa9e4066Sahrens cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 398*fa9e4066Sahrens } 399*fa9e4066Sahrens mutex_exit(&tx->tx_sync_lock); 400*fa9e4066Sahrens } 401*fa9e4066Sahrens 402*fa9e4066Sahrens static void 403*fa9e4066Sahrens txg_timelimit_thread(dsl_pool_t *dp) 404*fa9e4066Sahrens { 405*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 406*fa9e4066Sahrens callb_cpr_t cpr; 407*fa9e4066Sahrens 408*fa9e4066Sahrens txg_thread_enter(tx, &cpr); 409*fa9e4066Sahrens 410*fa9e4066Sahrens while (!tx->tx_exiting) { 411*fa9e4066Sahrens uint64_t txg = tx->tx_open_txg + 1; 412*fa9e4066Sahrens 413*fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time); 414*fa9e4066Sahrens 415*fa9e4066Sahrens if (tx->tx_quiesce_txg_waiting < txg) 416*fa9e4066Sahrens tx->tx_quiesce_txg_waiting = txg; 417*fa9e4066Sahrens 418*fa9e4066Sahrens while (!tx->tx_exiting && tx->tx_open_txg < txg) { 419*fa9e4066Sahrens dprintf("pushing out %llu\n", txg); 420*fa9e4066Sahrens cv_broadcast(&tx->tx_quiesce_more_cv); 421*fa9e4066Sahrens txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 422*fa9e4066Sahrens } 423*fa9e4066Sahrens } 424*fa9e4066Sahrens txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread); 425*fa9e4066Sahrens } 426*fa9e4066Sahrens 427*fa9e4066Sahrens int 428*fa9e4066Sahrens txg_stalled(dsl_pool_t *dp) 429*fa9e4066Sahrens { 430*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 431*fa9e4066Sahrens return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 432*fa9e4066Sahrens } 433*fa9e4066Sahrens 434*fa9e4066Sahrens void 435*fa9e4066Sahrens txg_suspend(dsl_pool_t *dp) 436*fa9e4066Sahrens { 437*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 438*fa9e4066Sahrens /* XXX some code paths suspend when they are already suspended! */ 439*fa9e4066Sahrens rw_enter(&tx->tx_suspend, RW_READER); 440*fa9e4066Sahrens } 441*fa9e4066Sahrens 442*fa9e4066Sahrens void 443*fa9e4066Sahrens txg_resume(dsl_pool_t *dp) 444*fa9e4066Sahrens { 445*fa9e4066Sahrens tx_state_t *tx = &dp->dp_tx; 446*fa9e4066Sahrens rw_exit(&tx->tx_suspend); 447*fa9e4066Sahrens } 448*fa9e4066Sahrens 449*fa9e4066Sahrens /* 450*fa9e4066Sahrens * Per-txg object lists. 451*fa9e4066Sahrens */ 452*fa9e4066Sahrens void 453*fa9e4066Sahrens txg_list_create(txg_list_t *tl, size_t offset) 454*fa9e4066Sahrens { 455*fa9e4066Sahrens int t; 456*fa9e4066Sahrens 457*fa9e4066Sahrens mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 458*fa9e4066Sahrens 459*fa9e4066Sahrens tl->tl_offset = offset; 460*fa9e4066Sahrens 461*fa9e4066Sahrens for (t = 0; t < TXG_SIZE; t++) 462*fa9e4066Sahrens tl->tl_head[t] = NULL; 463*fa9e4066Sahrens } 464*fa9e4066Sahrens 465*fa9e4066Sahrens void 466*fa9e4066Sahrens txg_list_destroy(txg_list_t *tl) 467*fa9e4066Sahrens { 468*fa9e4066Sahrens int t; 469*fa9e4066Sahrens 470*fa9e4066Sahrens for (t = 0; t < TXG_SIZE; t++) 471*fa9e4066Sahrens ASSERT(txg_list_empty(tl, t)); 472*fa9e4066Sahrens 473*fa9e4066Sahrens mutex_destroy(&tl->tl_lock); 474*fa9e4066Sahrens } 475*fa9e4066Sahrens 476*fa9e4066Sahrens int 477*fa9e4066Sahrens txg_list_empty(txg_list_t *tl, uint64_t txg) 478*fa9e4066Sahrens { 479*fa9e4066Sahrens return (tl->tl_head[txg & TXG_MASK] == NULL); 480*fa9e4066Sahrens } 481*fa9e4066Sahrens 482*fa9e4066Sahrens /* 483*fa9e4066Sahrens * Add an entry to the list. 484*fa9e4066Sahrens * Returns 0 if it's a new entry, 1 if it's already there. 485*fa9e4066Sahrens */ 486*fa9e4066Sahrens int 487*fa9e4066Sahrens txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 488*fa9e4066Sahrens { 489*fa9e4066Sahrens int t = txg & TXG_MASK; 490*fa9e4066Sahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 491*fa9e4066Sahrens int already_on_list; 492*fa9e4066Sahrens 493*fa9e4066Sahrens mutex_enter(&tl->tl_lock); 494*fa9e4066Sahrens already_on_list = tn->tn_member[t]; 495*fa9e4066Sahrens if (!already_on_list) { 496*fa9e4066Sahrens tn->tn_member[t] = 1; 497*fa9e4066Sahrens tn->tn_next[t] = tl->tl_head[t]; 498*fa9e4066Sahrens tl->tl_head[t] = tn; 499*fa9e4066Sahrens } 500*fa9e4066Sahrens mutex_exit(&tl->tl_lock); 501*fa9e4066Sahrens 502*fa9e4066Sahrens return (already_on_list); 503*fa9e4066Sahrens } 504*fa9e4066Sahrens 505*fa9e4066Sahrens /* 506*fa9e4066Sahrens * Remove the head of the list and return it. 507*fa9e4066Sahrens */ 508*fa9e4066Sahrens void * 509*fa9e4066Sahrens txg_list_remove(txg_list_t *tl, uint64_t txg) 510*fa9e4066Sahrens { 511*fa9e4066Sahrens int t = txg & TXG_MASK; 512*fa9e4066Sahrens txg_node_t *tn; 513*fa9e4066Sahrens void *p = NULL; 514*fa9e4066Sahrens 515*fa9e4066Sahrens mutex_enter(&tl->tl_lock); 516*fa9e4066Sahrens if ((tn = tl->tl_head[t]) != NULL) { 517*fa9e4066Sahrens p = (char *)tn - tl->tl_offset; 518*fa9e4066Sahrens tl->tl_head[t] = tn->tn_next[t]; 519*fa9e4066Sahrens tn->tn_next[t] = NULL; 520*fa9e4066Sahrens tn->tn_member[t] = 0; 521*fa9e4066Sahrens } 522*fa9e4066Sahrens mutex_exit(&tl->tl_lock); 523*fa9e4066Sahrens 524*fa9e4066Sahrens return (p); 525*fa9e4066Sahrens } 526*fa9e4066Sahrens 527*fa9e4066Sahrens /* 528*fa9e4066Sahrens * Remove a specific item from the list and return it. 529*fa9e4066Sahrens */ 530*fa9e4066Sahrens void * 531*fa9e4066Sahrens txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 532*fa9e4066Sahrens { 533*fa9e4066Sahrens int t = txg & TXG_MASK; 534*fa9e4066Sahrens txg_node_t *tn, **tp; 535*fa9e4066Sahrens 536*fa9e4066Sahrens mutex_enter(&tl->tl_lock); 537*fa9e4066Sahrens 538*fa9e4066Sahrens for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 539*fa9e4066Sahrens if ((char *)tn - tl->tl_offset == p) { 540*fa9e4066Sahrens *tp = tn->tn_next[t]; 541*fa9e4066Sahrens tn->tn_next[t] = NULL; 542*fa9e4066Sahrens tn->tn_member[t] = 0; 543*fa9e4066Sahrens mutex_exit(&tl->tl_lock); 544*fa9e4066Sahrens return (p); 545*fa9e4066Sahrens } 546*fa9e4066Sahrens } 547*fa9e4066Sahrens 548*fa9e4066Sahrens mutex_exit(&tl->tl_lock); 549*fa9e4066Sahrens 550*fa9e4066Sahrens return (NULL); 551*fa9e4066Sahrens } 552*fa9e4066Sahrens 553*fa9e4066Sahrens int 554*fa9e4066Sahrens txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 555*fa9e4066Sahrens { 556*fa9e4066Sahrens int t = txg & TXG_MASK; 557*fa9e4066Sahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 558*fa9e4066Sahrens 559*fa9e4066Sahrens return (tn->tn_member[t]); 560*fa9e4066Sahrens } 561*fa9e4066Sahrens 562*fa9e4066Sahrens /* 563*fa9e4066Sahrens * Walk a txg list -- only safe if you know it's not changing. 564*fa9e4066Sahrens */ 565*fa9e4066Sahrens void * 566*fa9e4066Sahrens txg_list_head(txg_list_t *tl, uint64_t txg) 567*fa9e4066Sahrens { 568*fa9e4066Sahrens int t = txg & TXG_MASK; 569*fa9e4066Sahrens txg_node_t *tn = tl->tl_head[t]; 570*fa9e4066Sahrens 571*fa9e4066Sahrens return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 572*fa9e4066Sahrens } 573*fa9e4066Sahrens 574*fa9e4066Sahrens void * 575*fa9e4066Sahrens txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 576*fa9e4066Sahrens { 577*fa9e4066Sahrens int t = txg & TXG_MASK; 578*fa9e4066Sahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 579*fa9e4066Sahrens 580*fa9e4066Sahrens tn = tn->tn_next[t]; 581*fa9e4066Sahrens 582*fa9e4066Sahrens return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 583*fa9e4066Sahrens } 584