Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[State Sync] Support transaction syncing from the account bootstrap version #318

Merged
merged 2 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ pub trait ChunkExecutorTrait: Send + Sync {
verified_target_li: &LedgerInfoWithSignatures,
epoch_change_li: Option<&LedgerInfoWithSignatures>,
) -> Result<(Vec<ContractEvent>, Vec<Transaction>)>;

/// Resets the chunk executor by synchronizing state with storage.
fn reset(&self) -> Result<()>;
}

pub trait BlockExecutorTrait: Send + Sync {
Expand Down
14 changes: 7 additions & 7 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ impl<V> ChunkExecutor<V> {
}
}

pub fn reset(&self) -> Result<()> {
*self.commit_queue.lock() = ChunkCommitQueue::new_from_db(&self.db.reader)?;
Ok(())
}

fn state_view(
&self,
latest_view: &ExecutedTrees,
Expand Down Expand Up @@ -167,7 +162,7 @@ impl<V: VMExecutor> ChunkExecutorTrait for ChunkExecutor<V> {
.local_synced_version(latest_view.version().unwrap_or(0))
.first_version_in_request(first_version_in_request)
.num_txns_in_request(num_txns),
"sync_request_executed",
"Executed transaction chunk!",
);

Ok(())
Expand Down Expand Up @@ -217,7 +212,7 @@ impl<V: VMExecutor> ChunkExecutorTrait for ChunkExecutor<V> {
.local_synced_version(latest_view.version().unwrap_or(0))
.first_version_in_request(first_version_in_request)
.num_txns_in_request(num_txns),
"sync_request_applied",
"Applied transaction output chunk!",
);

Ok(())
Expand Down Expand Up @@ -261,6 +256,11 @@ impl<V: VMExecutor> ChunkExecutorTrait for ChunkExecutor<V> {
)?;
self.commit_chunk()
}

fn reset(&self) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always hated the reset() interface, and was willing to kill it after the legacy interfaces x_and_y() gets removed together with state sync v1. Is it possible on the call site to just recreate the ChunkExecutor?

(Doesn't feel too strong either, probably not worth a huge refactor.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's gonna be a big-ish refactor to recreate it. If you're okay with it, let's leave it as is for now. We can clean it up when we come to handle the fine grain storage changes.

*self.commit_queue.lock() = ChunkCommitQueue::new_from_db(&self.db.reader)?;
Ok(())
}
}

impl<V: VMExecutor> ChunkExecutor<V> {}
Expand Down
110 changes: 62 additions & 48 deletions state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use aptos_types::{
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
state_store::state_value::StateValueChunkWithProof,
transaction::{
TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof, Version,
},
transaction::{TransactionListWithProof, TransactionOutputListWithProof, Version},
waypoint::Waypoint,
};
use data_streaming_service::{
Expand Down Expand Up @@ -175,6 +173,14 @@ impl VerifiedEpochStates {
}
}

/// Return all epoch ending ledger infos
pub fn all_epoch_ending_ledger_infos(&self) -> Vec<LedgerInfoWithSignatures> {
self.new_epoch_ending_ledger_infos
.values()
.cloned()
.collect()
}

/// Returns any epoch ending ledger info associated with the given version
pub fn get_epoch_ending_ledger_info(
&self,
Expand Down Expand Up @@ -222,7 +228,7 @@ struct AccountStateSyncer {
// Whether or not all states have been synced
is_sync_complete: bool,

// The ledger info we're currently syncing
// The epoch ending ledger info for the version we're syncing
ledger_info_to_sync: Option<LedgerInfoWithSignatures>,

// The next account index to commit (all accounts before this have been
Expand All @@ -233,8 +239,8 @@ struct AccountStateSyncer {
// processed -- i.e., sent to the storage synchronizer).
next_account_index_to_process: u64,

// The transaction info for the version we're trying to sync to
transaction_info_for_version: Option<TransactionInfo>,
// The transaction output (inc. info and proof) for the version we're syncing
transaction_output_to_sync: Option<TransactionOutputListWithProof>,
}

impl AccountStateSyncer {
Expand All @@ -245,7 +251,7 @@ impl AccountStateSyncer {
ledger_info_to_sync: None,
next_account_index_to_commit: 0,
next_account_index_to_process: 0,
transaction_info_for_version: None,
transaction_output_to_sync: None,
}
}

Expand Down Expand Up @@ -406,22 +412,29 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
let highest_known_ledger_info = self.get_highest_known_ledger_info()?;
let highest_known_ledger_version = highest_known_ledger_info.ledger_info().version();

// Check if we've already fetched the required data for bootstrapping
if highest_synced_version >= highest_known_ledger_version
|| self.account_state_syncer.is_sync_complete
{
return self.bootstrapping_complete();
}

// Bootstrap according to the mode
if self.driver_configuration.config.bootstrapping_mode
== BootstrappingMode::DownloadLatestAccountStates
{
self.fetch_all_account_states(highest_known_ledger_info)
.await
} else {
self.fetch_missing_transaction_data(highest_synced_version, highest_known_ledger_info)
// Check if we've already fetched the required data for bootstrapping.
// If not, bootstrap according to the mode.
match self.driver_configuration.config.bootstrapping_mode {
BootstrappingMode::DownloadLatestAccountStates => {
if (self.account_state_syncer.ledger_info_to_sync.is_none()
&& highest_synced_version >= highest_known_ledger_version)
|| self.account_state_syncer.is_sync_complete
{
return self.bootstrapping_complete();
}
self.fetch_all_account_states(highest_known_ledger_info)
.await
}
_ => {
if highest_synced_version >= highest_known_ledger_version {
return self.bootstrapping_complete();
}
self.fetch_missing_transaction_data(
highest_synced_version,
highest_known_ledger_info,
)
.await
}
}
}

Expand Down Expand Up @@ -485,8 +498,6 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
&mut self,
highest_known_ledger_info: LedgerInfoWithSignatures,
) -> Result<(), Error> {
let highest_known_ledger_version = highest_known_ledger_info.ledger_info().version();

// Verify we're trying to sync to an unchanging ledger info
if let Some(ledger_info_to_sync) = &self.account_state_syncer.ledger_info_to_sync {
if ledger_info_to_sync != &highest_known_ledger_info {
Expand All @@ -496,13 +507,14 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
);
}
} else {
self.account_state_syncer.ledger_info_to_sync = Some(highest_known_ledger_info);
self.account_state_syncer.ledger_info_to_sync = Some(highest_known_ledger_info.clone());
}

// Fetch the transaction info first, before the account states
let highest_known_ledger_version = highest_known_ledger_info.ledger_info().version();
let data_stream = if self
.account_state_syncer
.transaction_info_for_version
.transaction_output_to_sync
.is_none()
{
self.streaming_service_client
Expand Down Expand Up @@ -615,7 +627,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
);
return Err(Error::AdvertisedDataError(error_message));
} else if highest_local_epoch_end < highest_advertised_epoch_end {
debug!("Found higher epoch ending ledger infos in the network! Local: {:?}, advertised: {:?}",
info!("Found higher epoch ending ledger infos in the network! Local: {:?}, advertised: {:?}",
highest_local_epoch_end, highest_advertised_epoch_end);
let next_epoch_end = highest_local_epoch_end.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("The next epoch end has overflown!".into())
Expand All @@ -626,7 +638,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.await?;
self.active_data_stream = Some(epoch_ending_stream);
} else if self.verified_epoch_states.verified_waypoint() {
debug!("No new epoch ending ledger infos to fetch! All peers are in the same epoch!");
info!("No new epoch ending ledger infos to fetch! All peers are in the same epoch!");
self.verified_epoch_states
.set_fetched_epoch_ending_ledger_infos();
} else {
Expand Down Expand Up @@ -756,18 +768,26 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.account_state_syncer
.initialized_state_snapshot_receiver
{
let version = self
// Fetch all verified epoch change proofs
let epoch_change_proofs = self.verified_epoch_states.all_epoch_ending_ledger_infos();

// Fetch the target ledger info and transaction info for bootstrapping
let ledger_info_to_sync = self
.account_state_syncer
.ledger_info_to_sync
.as_ref()
.expect("Account state syncer version not initialized!")
.ledger_info()
.version();
let expected_root_hash = account_state_chunk_with_proof.root_hash;
.clone()
.expect("Ledger info to sync is missing!");
let transaction_output_to_sync = self
.account_state_syncer
.transaction_output_to_sync
.clone()
.expect("Transaction output to sync is missing!");

// Initialize the account state synchronizer
self.storage_synchronizer.initialize_account_synchronizer(
expected_root_hash,
version,
None, // TODO(joshlind): support spawning on a given runtime!
epoch_change_proofs,
ledger_info_to_sync,
transaction_output_to_sync,
)?;
self.account_state_syncer
.initialized_state_snapshot_receiver = true;
Expand Down Expand Up @@ -864,7 +884,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
BootstrappingMode::DownloadLatestAccountStates
) && self
.account_state_syncer
.transaction_info_for_version
.transaction_output_to_sync
.is_some())
{
self.terminate_active_stream(notification_id, NotificationFeedback::InvalidPayloadData)
Expand Down Expand Up @@ -997,19 +1017,13 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.await?;

// Verify the payload proof (the ledger info has already been verified)
// and save the transaction info.
// and save the transaction output with proof.
if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof {
match &transaction_outputs_with_proof.proof.transaction_infos[..] {
[transaction_info] => {
[_transaction_info] => {
// TODO(joshlind): don't save the transaction info until after verification!
self.account_state_syncer.transaction_info_for_version =
Some(transaction_info.clone());
self.storage_synchronizer.apply_transaction_outputs(
notification_id,
transaction_outputs_with_proof,
ledger_info_to_sync,
None,
)?;
self.account_state_syncer.transaction_output_to_sync =
Some(transaction_outputs_with_proof);
}
_ => {
self.terminate_active_stream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
&mut self,
consensus_sync_request: Arc<Mutex<Option<ConsensusSyncRequest>>>,
) -> Result<(), Error> {
// Fetch transactions or outputs starting at highest_synced_version + 1
// Fetch the highest synced version and epoch (in storage)
let (highest_synced_version, highest_synced_epoch) =
self.get_highest_synced_version_and_epoch()?;

// Fetch the highest epoch state (in storage)
let highest_epoch_state = utils::fetch_latest_epoch_state(self.storage.clone())?;

// Start fetching data at highest_synced_version + 1
let next_version = highest_synced_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("The next version has overflown!".into()))?;
Expand Down Expand Up @@ -115,7 +120,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
}
};
self.speculative_stream_state = Some(SpeculativeStreamState::new(
utils::fetch_latest_epoch_state(self.storage.clone())?,
highest_epoch_state,
None,
highest_synced_version,
));
Expand Down
3 changes: 3 additions & 0 deletions state-sync/state-sync-v2/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,12 @@ impl<

// Drive progress depending on if we're bootstrapping or continuously syncing
if self.bootstrapper.is_bootstrapped() {
// Fetch any consensus sync requests
let consensus_sync_request = self
.consensus_notification_handler
.get_consensus_sync_request();

// Attempt to continuously sync
if let Err(error) = self
.continuous_syncer
.drive_progress(consensus_sync_request)
Expand Down
Loading