Skip to content

Commit

Permalink
persist all transactions and effects during checkpoint construction (#…
Browse files Browse the repository at this point in the history
…20966)

This is a temporary change needed to prepare for the upgrade to data
quarantining. The data quarantining code will assume that any
uncommitted transactions will be replayed from consensus. But upon
upgrade, no replay will occur - the non-DQ build will have marked all
commits as processed immediately. This change prevents the assert at
https://github.com/MystenLabs/sui/blob/791ce9132d98cfff4874e9cff4278419e050d94c/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs#L1001
from firing during startup of a DQ build.
  • Loading branch information
mystenmark authored Jan 24, 2025
1 parent 0fd1597 commit d487c2c
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
21 changes: 21 additions & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,27 @@ impl AuthorityStore {
Ok(())
}

pub(crate) fn persist_transactions_and_effects(
&self,
transactions_and_effects: &[(VerifiedTransaction, TransactionEffects)],
) -> SuiResult {
let mut batch = self.perpetual_tables.transactions.batch();
batch.insert_batch(
&self.perpetual_tables.transactions,
transactions_and_effects
.iter()
.map(|(tx, _)| (*tx.digest(), tx.serializable_ref())),
)?;
batch.insert_batch(
&self.perpetual_tables.effects,
transactions_and_effects
.iter()
.map(|(_, fx)| (fx.digest(), fx.clone())),
)?;
batch.write()?;
Ok(())
}

pub async fn acquire_transaction_locks(
&self,
epoch_store: &AuthorityPerEpochStore,
Expand Down
17 changes: 17 additions & 0 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,23 @@ impl CheckpointBuilder {
let mut all_tx_digests =
Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());

// When upgrading to a data-quarantining build, we need to persist the transactions and effects
// to the database for crash recovery. After the upgrade this is no longer needed, because recovery
// is driven by replay of consensus commits.
// This only updates the content-addressed stores (similar to state sync), it does not mark any
// transactions as executed.
self.state
.get_cache_commit()
.persist_transactions_and_effects(
&new_checkpoints
.iter()
.flat_map(|(_, c)| {
c.iter()
.map(|digests| (digests.transaction, digests.effects))
})
.collect::<Vec<_>>(),
);

for (summary, contents) in &new_checkpoints {
debug!(
checkpoint_commit_height = height,
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-core/src/execution_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ pub trait ExecutionCacheCommit: Send + Sync {
/// and pending_consensus_transactions, and this method can be removed.
fn persist_transactions<'a>(&'a self, digests: &'a [TransactionDigest]) -> BoxFuture<'a, ()>;

/// Persist transactions and their effects to the database, but no other outputs.
/// Additionally this stores the content-addressed effects in the database but does
/// not add an executed_effects. This is required for recovery from a crash when
/// upgrading to data-quarantining.
/// TODO: remove this once all nodes have upgraded to data-quarantining.
fn persist_transactions_and_effects(
&self,
digests: &[(TransactionDigest, TransactionEffectsDigest)],
);

// Number of pending uncommitted transactions
fn approximate_pending_transaction_count(&self) -> u64;
}
Expand Down
38 changes: 38 additions & 0 deletions crates/sui-core/src/execution_cache/writeback_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,37 @@ impl WritebackCache {
self.set_backpressure(pending_count);
}

fn persist_transactions_and_effects(
&self,
digests: &[(TransactionDigest, TransactionEffectsDigest)],
) {
let mut transactions_and_effects = Vec::with_capacity(digests.len());
for (tx_digest, fx_digest) in digests {
let Some(outputs) = self
.dirty
.pending_transaction_writes
.get(tx_digest)
.map(|o| o.clone())
else {
debug!("transaction {:?} was already committed", tx_digest);
continue;
};

assert_eq!(
outputs.effects.digest(),
*fx_digest,
"effects digest mismatch"
);

transactions_and_effects
.push(((*outputs.transaction).clone(), outputs.effects.clone()));
}

self.store
.persist_transactions_and_effects(&transactions_and_effects)
.expect("db error");
}

fn approximate_pending_transaction_count(&self) -> u64 {
let num_commits = self
.dirty
Expand Down Expand Up @@ -1321,6 +1352,13 @@ impl ExecutionCacheCommit for WritebackCache {
fn approximate_pending_transaction_count(&self) -> u64 {
WritebackCache::approximate_pending_transaction_count(self)
}

fn persist_transactions_and_effects(
&self,
digests: &[(TransactionDigest, TransactionEffectsDigest)],
) {
WritebackCache::persist_transactions_and_effects(self, digests);
}
}

impl ObjectCacheRead for WritebackCache {
Expand Down

0 comments on commit d487c2c

Please sign in to comment.