From db17f800bdb609daf9d62a709203c7a446df54d8 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Fri, 25 Mar 2022 09:41:48 -0400 Subject: [PATCH 1/2] [State Sync] Add a reset() method to the ChunkExecutorTrait. --- execution/executor-types/src/lib.rs | 3 +++ execution/executor/src/chunk_executor.rs | 14 +++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 53f003ba46ee4..6e564311eb935 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -80,6 +80,9 @@ pub trait ChunkExecutorTrait: Send + Sync { verified_target_li: &LedgerInfoWithSignatures, epoch_change_li: Option<&LedgerInfoWithSignatures>, ) -> Result<(Vec, Vec)>; + + /// Resets the chunk executor by synchronizing state with storage. + fn reset(&self) -> Result<()>; } pub trait BlockExecutorTrait: Send + Sync { diff --git a/execution/executor/src/chunk_executor.rs b/execution/executor/src/chunk_executor.rs index f002d7a20ec61..9ecd36050d71f 100644 --- a/execution/executor/src/chunk_executor.rs +++ b/execution/executor/src/chunk_executor.rs @@ -57,11 +57,6 @@ impl ChunkExecutor { } } - pub fn reset(&self) -> Result<()> { - *self.commit_queue.lock() = ChunkCommitQueue::new_from_db(&self.db.reader)?; - Ok(()) - } - fn state_view( &self, latest_view: &ExecutedTrees, @@ -167,7 +162,7 @@ impl ChunkExecutorTrait for ChunkExecutor { .local_synced_version(latest_view.version().unwrap_or(0)) .first_version_in_request(first_version_in_request) .num_txns_in_request(num_txns), - "sync_request_executed", + "Executed transaction chunk!", ); Ok(()) @@ -217,7 +212,7 @@ impl ChunkExecutorTrait for ChunkExecutor { .local_synced_version(latest_view.version().unwrap_or(0)) .first_version_in_request(first_version_in_request) .num_txns_in_request(num_txns), - "sync_request_applied", + "Applied transaction output chunk!", ); Ok(()) @@ -261,6 +256,11 @@ impl ChunkExecutorTrait for ChunkExecutor { )?; self.commit_chunk() } + + fn reset(&self) -> Result<()> { + *self.commit_queue.lock() = ChunkCommitQueue::new_from_db(&self.db.reader)?; + Ok(()) + } } impl ChunkExecutor {} From 57d8f547806b5e33fa147508042181f244ddeac9 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Fri, 25 Mar 2022 09:43:30 -0400 Subject: [PATCH 2/2] [State Sync] Refactor common backup functionality and expose via DbWriter Closes: #318 --- .../state-sync-driver/src/bootstrapper.rs | 110 +++++++++------- .../src/continuous_syncer.rs | 9 +- .../state-sync-driver/src/driver.rs | 3 + .../src/storage_synchronizer.rs | 124 +++++++++++++----- storage/aptosdb/src/backup/mod.rs | 1 + storage/aptosdb/src/backup/restore_handler.rs | 79 +++-------- storage/aptosdb/src/backup/restore_utils.rs | 101 ++++++++++++++ storage/aptosdb/src/event_store/mod.rs | 2 +- storage/aptosdb/src/lib.rs | 73 ++++++++++- storage/aptosdb/src/transaction_store/mod.rs | 2 +- storage/storage-interface/src/lib.rs | 43 ++++-- testsuite/smoke-test/src/state_sync_v2.rs | 3 +- 12 files changed, 384 insertions(+), 166 deletions(-) create mode 100644 storage/aptosdb/src/backup/restore_utils.rs diff --git a/state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs b/state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs index 89e5706725ea4..9f620ff08a2d8 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs @@ -13,9 +13,7 @@ use aptos_types::{ epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, state_store::state_value::StateValueChunkWithProof, - transaction::{ - TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof, Version, - }, + transaction::{TransactionListWithProof, TransactionOutputListWithProof, Version}, waypoint::Waypoint, }; use data_streaming_service::{ @@ -175,6 +173,14 @@ impl VerifiedEpochStates { } } + /// Return all epoch ending ledger infos + pub fn all_epoch_ending_ledger_infos(&self) -> Vec { + self.new_epoch_ending_ledger_infos + .values() + .cloned() + .collect() + } + /// Returns any epoch ending ledger info associated with the given version pub fn get_epoch_ending_ledger_info( &self, @@ -222,7 +228,7 @@ struct AccountStateSyncer { // Whether or not all states have been synced is_sync_complete: bool, - // The ledger info we're currently syncing + // The epoch ending ledger info for the version we're syncing ledger_info_to_sync: Option, // The next account index to commit (all accounts before this have been @@ -233,8 +239,8 @@ struct AccountStateSyncer { // processed -- i.e., sent to the storage synchronizer). next_account_index_to_process: u64, - // The transaction info for the version we're trying to sync to - transaction_info_for_version: Option, + // The transaction output (inc. info and proof) for the version we're syncing + transaction_output_to_sync: Option, } impl AccountStateSyncer { @@ -245,7 +251,7 @@ impl AccountStateSyncer { ledger_info_to_sync: None, next_account_index_to_commit: 0, next_account_index_to_process: 0, - transaction_info_for_version: None, + transaction_output_to_sync: None, } } @@ -406,22 +412,29 @@ impl Bootstrapper= highest_known_ledger_version - || self.account_state_syncer.is_sync_complete - { - return self.bootstrapping_complete(); - } - - // Bootstrap according to the mode - if self.driver_configuration.config.bootstrapping_mode - == BootstrappingMode::DownloadLatestAccountStates - { - self.fetch_all_account_states(highest_known_ledger_info) - .await - } else { - self.fetch_missing_transaction_data(highest_synced_version, highest_known_ledger_info) + // Check if we've already fetched the required data for bootstrapping. + // If not, bootstrap according to the mode. + match self.driver_configuration.config.bootstrapping_mode { + BootstrappingMode::DownloadLatestAccountStates => { + if (self.account_state_syncer.ledger_info_to_sync.is_none() + && highest_synced_version >= highest_known_ledger_version) + || self.account_state_syncer.is_sync_complete + { + return self.bootstrapping_complete(); + } + self.fetch_all_account_states(highest_known_ledger_info) + .await + } + _ => { + if highest_synced_version >= highest_known_ledger_version { + return self.bootstrapping_complete(); + } + self.fetch_missing_transaction_data( + highest_synced_version, + highest_known_ledger_info, + ) .await + } } } @@ -485,8 +498,6 @@ impl Bootstrapper Result<(), Error> { - let highest_known_ledger_version = highest_known_ledger_info.ledger_info().version(); - // Verify we're trying to sync to an unchanging ledger info if let Some(ledger_info_to_sync) = &self.account_state_syncer.ledger_info_to_sync { if ledger_info_to_sync != &highest_known_ledger_info { @@ -496,13 +507,14 @@ impl Bootstrapper Bootstrapper Bootstrapper Bootstrapper Bootstrapper Bootstrapper { + [_transaction_info] => { // TODO(joshlind): don't save the transaction info until after verification! - self.account_state_syncer.transaction_info_for_version = - Some(transaction_info.clone()); - self.storage_synchronizer.apply_transaction_outputs( - notification_id, - transaction_outputs_with_proof, - ledger_info_to_sync, - None, - )?; + self.account_state_syncer.transaction_output_to_sync = + Some(transaction_outputs_with_proof); } _ => { self.terminate_active_stream( diff --git a/state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs b/state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs index fc1b33216e865..82d30a502b0bd 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs @@ -81,9 +81,14 @@ impl ContinuousSyncer>>, ) -> Result<(), Error> { - // Fetch transactions or outputs starting at highest_synced_version + 1 + // Fetch the highest synced version and epoch (in storage) let (highest_synced_version, highest_synced_epoch) = self.get_highest_synced_version_and_epoch()?; + + // Fetch the highest epoch state (in storage) + let highest_epoch_state = utils::fetch_latest_epoch_state(self.storage.clone())?; + + // Start fetching data at highest_synced_version + 1 let next_version = highest_synced_version .checked_add(1) .ok_or_else(|| Error::IntegerOverflow("The next version has overflown!".into()))?; @@ -115,7 +120,7 @@ impl ContinuousSyncer, ) -> Result<(), Error>; - /// Initializes an account synchronizer with the specified version and - /// expected root hash and spawns the synchronizer on the given `runtime`. - /// If `runtime` is None, uses the current runtime. + /// Initializes an account synchronizer with the specified + /// `target_ledger_info` and `target_output_with_proof` at the target + /// syncing version. Also, writes all `epoch_change_proofs` to storage. /// - /// Note: this assumes `expected_root_hash` has already been verified. + /// Note: this assumes that `epoch_change_proofs`, `target_ledger_info`, + /// and `target_output_with_proof` have already been verified. fn initialize_account_synchronizer( &mut self, - expected_root_hash: HashValue, - version: Version, - runtime: Option<&Runtime>, + epoch_change_proofs: Vec, + target_ledger_info: LedgerInfoWithSignatures, + target_output_with_proof: TransactionOutputListWithProof, ) -> Result<(), Error>; /// Returns true iff there is storage data that is still waiting @@ -83,8 +83,10 @@ pub trait StorageSynchronizerInterface { } /// The implementation of the `StorageSynchronizerInterface` used by state sync -#[derive(Clone)] -pub struct StorageSynchronizer { +pub struct StorageSynchronizer { + // The executor for transaction and transaction output chunks + chunk_executor: Arc, + // A channel through which to notify the driver of committed data commit_notification_sender: mpsc::UnboundedSender, @@ -97,6 +99,9 @@ pub struct StorageSynchronizer { // The number of storage data chunks pending execute/apply, or commit pending_data_chunks: Arc, + // An optional runtime on which to spawn the storage synchronizer threads + runtime: Option, + // The channel through which to notify the state snapshot receiver of new data chunks state_snapshot_notifier: Option>, @@ -104,8 +109,25 @@ pub struct StorageSynchronizer { storage: Arc, } -impl StorageSynchronizer { - pub fn new( +// TODO(joshlind): this cannot currently be derived because of limitations around +// how deriving `Clone` works. See: https://github.com/rust-lang/rust/issues/26925. +impl Clone for StorageSynchronizer { + fn clone(&self) -> Self { + Self { + chunk_executor: self.chunk_executor.clone(), + commit_notification_sender: self.commit_notification_sender.clone(), + error_notification_sender: self.error_notification_sender.clone(), + executor_notifier: self.executor_notifier.clone(), + pending_data_chunks: self.pending_data_chunks.clone(), + runtime: self.runtime.clone(), + state_snapshot_notifier: self.state_snapshot_notifier.clone(), + storage: self.storage.clone(), + } + } +} + +impl StorageSynchronizer { + pub fn new( chunk_executor: Arc, commit_notification_sender: mpsc::UnboundedSender, error_notification_sender: mpsc::UnboundedSender, @@ -122,30 +144,33 @@ impl StorageSynchronizer { let pending_transaction_chunks = Arc::new(AtomicU64::new(0)); // Spawn the executor that executes/applies storage data chunks + let runtime = runtime.map(|runtime| runtime.handle().clone()); spawn_executor( chunk_executor.clone(), error_notification_sender.clone(), executor_listener, committer_notifier, pending_transaction_chunks.clone(), - runtime, + runtime.clone(), ); // Spawn the committer that commits executed (but pending) chunks spawn_committer( - chunk_executor, + chunk_executor.clone(), committer_listener, commit_notification_sender.clone(), error_notification_sender.clone(), pending_transaction_chunks.clone(), - runtime, + runtime.clone(), ); Self { + chunk_executor, commit_notification_sender, error_notification_sender, executor_notifier, pending_data_chunks: pending_transaction_chunks, + runtime, state_snapshot_notifier: None, storage, } @@ -165,7 +190,9 @@ impl StorageSynchronizer { } } -impl StorageSynchronizerInterface for StorageSynchronizer { +impl StorageSynchronizerInterface + for StorageSynchronizer +{ fn apply_transaction_outputs( &mut self, notification_id: NotificationId, @@ -200,23 +227,25 @@ impl StorageSynchronizerInterface for StorageSynchronizer { fn initialize_account_synchronizer( &mut self, - expected_root_hash: HashValue, - version: Version, - runtime: Option<&Runtime>, + epoch_change_proofs: Vec, + target_ledger_info: LedgerInfoWithSignatures, + target_output_with_proof: TransactionOutputListWithProof, ) -> Result<(), Error> { // Create a channel to notify the state snapshot receiver when data chunks are ready let (state_snapshot_notifier, state_snapshot_listener) = mpsc::channel(MAX_PENDING_CHUNKS); // Spawn the state snapshot receiver that commits account states spawn_state_snapshot_receiver( + self.chunk_executor.clone(), state_snapshot_listener, self.commit_notification_sender.clone(), - expected_root_hash, self.error_notification_sender.clone(), self.pending_data_chunks.clone(), self.storage.clone(), - version, - runtime, + epoch_change_proofs, + target_ledger_info, + target_output_with_proof, + self.runtime.clone(), ); self.state_snapshot_notifier = Some(state_snapshot_notifier); @@ -277,7 +306,7 @@ fn spawn_executor( mut executor_listener: mpsc::Receiver, mut committer_notifier: mpsc::Sender, pending_transaction_chunks: Arc, - runtime: Option<&Runtime>, + runtime: Option, ) { // Create an executor let executor = async move { @@ -340,7 +369,7 @@ fn spawn_committer( mut commit_notification_sender: mpsc::UnboundedSender, error_notification_sender: mpsc::UnboundedSender, pending_transaction_chunks: Arc, - runtime: Option<&Runtime>, + runtime: Option, ) { // Create a committer let committer = async move { @@ -373,21 +402,35 @@ fn spawn_committer( } /// Spawns a dedicated receiver that commits accounts from a state snapshot -fn spawn_state_snapshot_receiver( +fn spawn_state_snapshot_receiver( + chunk_executor: Arc, mut state_snapshot_listener: mpsc::Receiver, mut commit_notification_sender: mpsc::UnboundedSender, - expected_root_hash: HashValue, error_notification_sender: mpsc::UnboundedSender, pending_transaction_chunks: Arc, storage: Arc, - version: Version, - runtime: Option<&Runtime>, + epoch_change_proofs: Vec, + target_ledger_info: LedgerInfoWithSignatures, + target_output_with_proof: TransactionOutputListWithProof, + runtime: Option, ) { // Create a state snapshot receiver let receiver = async move { + // Get the target version and expected root hash + let version = target_ledger_info.ledger_info().version(); + let expected_root_hash = target_output_with_proof + .proof + .transaction_infos + .first() + .expect("Target transaction info should exist!") + .state_change_hash(); + + // Create the snapshot receiver let mut state_snapshot_receiver = storage .get_state_snapshot_receiver(version, expected_root_hash) .expect("Failed to initialize the state snapshot receiver!"); + + // Handle account state chunks loop { ::futures::select! { storage_data_chunk = state_snapshot_listener.select_next_some() => { @@ -410,11 +453,24 @@ fn spawn_state_snapshot_receiver( let error = format!("Failed to send account commit notification! Error: {:?}", error); send_storage_synchronizer_error(error_notification_sender.clone(), notification_id, error).await; } else if all_accounts_synced { - // Update the receiver and return - if let Err(error) = state_snapshot_receiver.finish_box() { - let error = format!("Failed to finish the account states synchronization! Error: {:?}", error); + // We're done synchronizing account states. Finalize storage and reset the executor. + let finalized_result = if let Err(error) = state_snapshot_receiver.finish_box() { + Err(format!("Failed to finish the account states synchronization! Error: {:?}", error)) + } else if let Err(error) = storage.finalize_state_snapshot(version, target_output_with_proof) { + Err(format!("Failed to finalize the state snapshot! Error: {:?}", error)) + } else if let Err(error) = storage.save_ledger_infos(&epoch_change_proofs) { + Err(format!("Failed to save all epoch ending ledger infos! Error: {:?}", error)) + } else if let Err(error) = chunk_executor.reset() { // Reset the chunk executor (to read the latest db state) + Err(format!("Failed to reset the chunk executor after account states synchronization! Error: {:?}", error)) + } else { + Ok(()) + }; + + // Notify the state sync driver of any errors + if let Err(error) = finalized_result { send_storage_synchronizer_error(error_notification_sender.clone(), notification_id, error).await; } + decrement_atomic(pending_transaction_chunks.clone()); return; } @@ -441,7 +497,7 @@ fn spawn_state_snapshot_receiver( /// Spawns a future on a specified runtime. If no runtime is specified, uses /// the current runtime. -fn spawn(runtime: Option<&Runtime>, future: impl Future + Send + 'static) { +fn spawn(runtime: Option, future: impl Future + Send + 'static) { if let Some(runtime) = runtime { runtime.spawn(future); } else { diff --git a/storage/aptosdb/src/backup/mod.rs b/storage/aptosdb/src/backup/mod.rs index d0e4a0cd6232a..6ffa3ef5084e9 100644 --- a/storage/aptosdb/src/backup/mod.rs +++ b/storage/aptosdb/src/backup/mod.rs @@ -3,6 +3,7 @@ pub mod backup_handler; pub mod restore_handler; +pub mod restore_utils; #[cfg(test)] mod test; diff --git a/storage/aptosdb/src/backup/restore_handler.rs b/storage/aptosdb/src/backup/restore_handler.rs index f1a339bf9a806..988e95ec5ac7b 100644 --- a/storage/aptosdb/src/backup/restore_handler.rs +++ b/storage/aptosdb/src/backup/restore_handler.rs @@ -2,17 +2,16 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - change_set::ChangeSet, event_store::EventStore, ledger_store::LedgerStore, - schema::transaction_accumulator::TransactionAccumulatorSchema, state_store::StateStore, - transaction_store::TransactionStore, AptosDB, + backup::restore_utils, event_store::EventStore, ledger_store::LedgerStore, + state_store::StateStore, transaction_store::TransactionStore, AptosDB, }; -use anyhow::{ensure, Result}; +use anyhow::Result; use aptos_crypto::{hash::SPARSE_MERKLE_PLACEHOLDER_HASH, HashValue}; use aptos_jellyfish_merkle::restore::JellyfishMerkleRestore; use aptos_types::{ contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures, - proof::{definition::LeafCount, position::FrozenSubTreeIterator}, + proof::definition::LeafCount, state_store::state_value::StateValue, transaction::{Transaction, TransactionInfo, Version, PRE_GENESIS_VERSION}, }; @@ -63,25 +62,7 @@ impl RestoreHandler { } pub fn save_ledger_infos(&self, ledger_infos: &[LedgerInfoWithSignatures]) -> Result<()> { - ensure!(!ledger_infos.is_empty(), "No LedgerInfos to save."); - - let mut cs = ChangeSet::new(); - ledger_infos - .iter() - .map(|li| self.ledger_store.put_ledger_info(li, &mut cs)) - .collect::>>()?; - self.db.write_schemas(cs.batch)?; - - if let Some(li) = self.ledger_store.get_latest_ledger_info_option() { - if li.ledger_info().epoch() > ledger_infos.last().unwrap().ledger_info().epoch() { - // No need to update latest ledger info. - return Ok(()); - } - } - - self.ledger_store - .set_latest_ledger_info(ledger_infos.last().unwrap().clone()); - Ok(()) + restore_utils::save_ledger_infos(self.db.clone(), self.ledger_store.clone(), ledger_infos) } pub fn confirm_or_save_frozen_subtrees( @@ -89,34 +70,7 @@ impl RestoreHandler { num_leaves: LeafCount, frozen_subtrees: &[HashValue], ) -> Result<()> { - let mut cs = ChangeSet::new(); - let positions: Vec<_> = FrozenSubTreeIterator::new(num_leaves).collect(); - - ensure!( - positions.len() == frozen_subtrees.len(), - "Number of frozen subtree roots not expected. Expected: {}, actual: {}", - positions.len(), - frozen_subtrees.len(), - ); - - positions - .iter() - .zip(frozen_subtrees.iter().rev()) - .map(|(p, h)| { - if let Some(_h) = self.db.get::(p)? { - ensure!( - h == &_h, - "Frozen subtree root does not match that already in DB. Provided: {}, in db: {}.", - h, - _h, - ); - } else { - cs.batch.put::(p, h)?; - } - Ok(()) - }) - .collect::>>()?; - self.db.write_schemas(cs.batch) + restore_utils::confirm_or_save_frozen_subtrees(self.db.clone(), num_leaves, frozen_subtrees) } pub fn save_transactions( @@ -126,17 +80,16 @@ impl RestoreHandler { txn_infos: &[TransactionInfo], events: &[Vec], ) -> Result<()> { - let mut cs = ChangeSet::new(); - for (idx, txn) in txns.iter().enumerate() { - self.transaction_store - .put_transaction(first_version + idx as Version, txn, &mut cs)?; - } - self.ledger_store - .put_transaction_infos(first_version, txn_infos, &mut cs)?; - self.event_store - .put_events_multiple_versions(first_version, events, &mut cs)?; - - self.db.write_schemas(cs.batch) + restore_utils::save_transactions( + self.db.clone(), + self.ledger_store.clone(), + self.transaction_store.clone(), + self.event_store.clone(), + first_version, + txns, + txn_infos, + events, + ) } pub fn get_tree_state(&self, num_transactions: LeafCount) -> Result { diff --git a/storage/aptosdb/src/backup/restore_utils.rs b/storage/aptosdb/src/backup/restore_utils.rs new file mode 100644 index 0000000000000..67e8fc984f22a --- /dev/null +++ b/storage/aptosdb/src/backup/restore_utils.rs @@ -0,0 +1,101 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +///! This file contains utilities that are helpful for performing +///! database restore operations, as required by db-restore and +///! state sync v2. +use crate::{ + change_set::ChangeSet, event_store::EventStore, ledger_store::LedgerStore, + schema::transaction_accumulator::TransactionAccumulatorSchema, + transaction_store::TransactionStore, +}; +use anyhow::{ensure, Result}; +use aptos_crypto::HashValue; +use aptos_types::{ + contract_event::ContractEvent, + ledger_info::LedgerInfoWithSignatures, + proof::{definition::LeafCount, position::FrozenSubTreeIterator}, + transaction::{Transaction, TransactionInfo, Version}, +}; +use schemadb::DB; +use std::sync::Arc; + +pub fn save_ledger_infos( + db: Arc, + ledger_store: Arc, + ledger_infos: &[LedgerInfoWithSignatures], +) -> Result<()> { + ensure!(!ledger_infos.is_empty(), "No LedgerInfos to save."); + + let mut cs = ChangeSet::new(); + ledger_infos + .iter() + .map(|li| ledger_store.put_ledger_info(li, &mut cs)) + .collect::>>()?; + db.write_schemas(cs.batch)?; + + if let Some(li) = ledger_store.get_latest_ledger_info_option() { + if li.ledger_info().epoch() > ledger_infos.last().unwrap().ledger_info().epoch() { + // No need to update latest ledger info. + return Ok(()); + } + } + + ledger_store.set_latest_ledger_info(ledger_infos.last().unwrap().clone()); + Ok(()) +} + +pub fn confirm_or_save_frozen_subtrees( + db: Arc, + num_leaves: LeafCount, + frozen_subtrees: &[HashValue], +) -> Result<()> { + let mut cs = ChangeSet::new(); + let positions: Vec<_> = FrozenSubTreeIterator::new(num_leaves).collect(); + + ensure!( + positions.len() == frozen_subtrees.len(), + "Number of frozen subtree roots not expected. Expected: {}, actual: {}", + positions.len(), + frozen_subtrees.len(), + ); + + positions + .iter() + .zip(frozen_subtrees.iter().rev()) + .map(|(p, h)| { + if let Some(_h) = db.get::(p)? { + ensure!( + h == &_h, + "Frozen subtree root does not match that already in DB. Provided: {}, in db: {}.", + h, + _h, + ); + } else { + cs.batch.put::(p, h)?; + } + Ok(()) + }) + .collect::>>()?; + db.write_schemas(cs.batch) +} + +pub fn save_transactions( + db: Arc, + ledger_store: Arc, + transaction_store: Arc, + event_store: Arc, + first_version: Version, + txns: &[Transaction], + txn_infos: &[TransactionInfo], + events: &[Vec], +) -> Result<()> { + let mut cs = ChangeSet::new(); + for (idx, txn) in txns.iter().enumerate() { + transaction_store.put_transaction(first_version + idx as Version, txn, &mut cs)?; + } + ledger_store.put_transaction_infos(first_version, txn_infos, &mut cs)?; + event_store.put_events_multiple_versions(first_version, events, &mut cs)?; + + db.write_schemas(cs.batch) +} diff --git a/storage/aptosdb/src/event_store/mod.rs b/storage/aptosdb/src/event_store/mod.rs index f8fade421ba39..50fd36fbd321a 100644 --- a/storage/aptosdb/src/event_store/mod.rs +++ b/storage/aptosdb/src/event_store/mod.rs @@ -39,7 +39,7 @@ use std::{ }; #[derive(Debug)] -pub(crate) struct EventStore { +pub struct EventStore { db: Arc, } diff --git a/storage/aptosdb/src/lib.rs b/storage/aptosdb/src/lib.rs index 8c632ac492c1c..db3dc90edf8e8 100644 --- a/storage/aptosdb/src/lib.rs +++ b/storage/aptosdb/src/lib.rs @@ -37,7 +37,7 @@ mod aptosdb_test; pub use aptosdb_test::test_save_blocks_impl; use crate::{ - backup::{backup_handler::BackupHandler, restore_handler::RestoreHandler}, + backup::{backup_handler::BackupHandler, restore_handler::RestoreHandler, restore_utils}, change_set::{ChangeSet, SealedChangeSet}, errors::AptosDbError, event_store::EventStore, @@ -76,9 +76,9 @@ use aptos_types::{ state_value::{StateValue, StateValueChunkWithProof, StateValueWithProof}, }, transaction::{ - AccountTransactionsWithProof, TransactionInfo, TransactionListWithProof, TransactionOutput, - TransactionOutputListWithProof, TransactionToCommit, TransactionWithProof, Version, - PRE_GENESIS_VERSION, + AccountTransactionsWithProof, Transaction, TransactionInfo, TransactionListWithProof, + TransactionOutput, TransactionOutputListWithProof, TransactionToCommit, + TransactionWithProof, Version, PRE_GENESIS_VERSION, }, }; use itertools::zip_eq; @@ -1242,6 +1242,16 @@ impl ResourceResolver for AptosDB { impl MoveDbReader for AptosDB {} impl DbWriter for AptosDB { + fn save_ledger_infos(&self, ledger_infos: &[LedgerInfoWithSignatures]) -> Result<()> { + gauged_api("save_ledger_infos", || { + restore_utils::save_ledger_infos( + self.db.clone(), + self.ledger_store.clone(), + ledger_infos, + ) + }) + } + /// `first_version` is the version of the first transaction in `txns_to_commit`. /// When `ledger_info_with_sigs` is provided, verify that the transaction accumulator root hash /// it carries is generated after the `txns_to_commit` are applied. @@ -1342,6 +1352,61 @@ impl DbWriter for AptosDB { .get_snapshot_receiver(version, expected_root_hash) }) } + + fn finalize_state_snapshot( + &self, + version: Version, + output_with_proof: TransactionOutputListWithProof, + ) -> Result<()> { + gauged_api("finalize_state_snapshot", || { + // Ensure the output with proof only contains a single transaction output and info + let num_transaction_outputs = output_with_proof.transactions_and_outputs.len(); + let num_transaction_infos = output_with_proof.proof.transaction_infos.len(); + ensure!( + num_transaction_outputs == 1, + "Number of transaction outputs should == 1, but got: {}", + num_transaction_outputs + ); + ensure!( + num_transaction_infos == 1, + "Number of transaction infos should == 1, but got: {}", + num_transaction_infos + ); + + // Update the merkle accumulator using the given proof + let frozen_subtrees = output_with_proof + .proof + .ledger_info_to_transaction_infos_proof + .left_siblings(); + restore_utils::confirm_or_save_frozen_subtrees( + self.db.clone(), + version, + frozen_subtrees, + )?; + + // Insert the target transactions, infos and events into the database + let (transactions, outputs): (Vec, Vec) = + output_with_proof + .transactions_and_outputs + .into_iter() + .unzip(); + let events = outputs + .into_iter() + .map(|output| output.events().to_vec()) + .collect::>(); + let transaction_infos = output_with_proof.proof.transaction_infos; + restore_utils::save_transactions( + self.db.clone(), + self.ledger_store.clone(), + self.transaction_store.clone(), + self.event_store.clone(), + version, + &transactions, + &transaction_infos, + &events, + ) + }) + } } // Convert requested range and order to a range in ascending order. diff --git a/storage/aptosdb/src/transaction_store/mod.rs b/storage/aptosdb/src/transaction_store/mod.rs index 9d718e86ba99a..05a5bd6f16f45 100644 --- a/storage/aptosdb/src/transaction_store/mod.rs +++ b/storage/aptosdb/src/transaction_store/mod.rs @@ -26,7 +26,7 @@ use schemadb::{ReadOptions, SchemaBatch, SchemaIterator, DB}; use std::sync::Arc; #[derive(Debug)] -pub(crate) struct TransactionStore { +pub struct TransactionStore { db: Arc, } diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index 84714879e2878..aa7376b0ea31d 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -572,6 +572,38 @@ impl MoveStorage for &dyn DbReader { /// expected of an Aptos DB. This adds write APIs to DbReader. #[allow(unused_variables)] pub trait DbWriter: Send + Sync { + /// Get a (stateful) state snapshot receiver. + /// + /// Chunk of accounts need to be added via `add_chunk()` before finishing up with `finish_box()` + fn get_state_snapshot_receiver( + &self, + version: Version, + expected_root_hash: HashValue, + ) -> Result>> { + unimplemented!() + } + + /// Finalizes a state snapshot that has already been restored to the database through + /// a state snapshot receiver. This is required to bootstrap the transaction accumulator + /// and populate transaction and event information. + /// + /// Note: this assumes that the output with proof has already been verified and that the + /// state snapshot was restored at the same version. + fn finalize_state_snapshot( + &self, + version: Version, + output_with_proof: TransactionOutputListWithProof, + ) -> Result<()> { + unimplemented!() + } + + /// Persists the specified ledger infos. + /// + /// Note: this assumes that the ledger infos have already been verified. + fn save_ledger_infos(&self, ledger_infos: &[LedgerInfoWithSignatures]) -> Result<()> { + unimplemented!() + } + /// Persist transactions. Called by the executor module when either syncing nodes or committing /// blocks during normal operation. /// See [`AptosDB::save_transactions`]. @@ -585,17 +617,6 @@ pub trait DbWriter: Send + Sync { ) -> Result<()> { unimplemented!() } - - /// Get a (stateful) state snapshot receiver. - /// - /// Chunk of accounts need to be added via `add_chunk()` before finishing up with `finish_box()` - fn get_state_snapshot_receiver( - &self, - version: Version, - expected_root_hash: HashValue, - ) -> Result>> { - unimplemented!() - } } pub trait MoveDbReader: diff --git a/testsuite/smoke-test/src/state_sync_v2.rs b/testsuite/smoke-test/src/state_sync_v2.rs index e09a91ac93d29..29e751a1ac24e 100644 --- a/testsuite/smoke-test/src/state_sync_v2.rs +++ b/testsuite/smoke-test/src/state_sync_v2.rs @@ -32,8 +32,7 @@ async fn test_full_node_bootstrap_accounts() { let vfn_peer_id = create_full_node(vfn_config, &mut swarm).await; swarm.fullnode_mut(vfn_peer_id).unwrap().stop(); - // Enable account count support for the validator (with at most 2 accounts - // per storage request). + // Set at most 2 accounts per storage request for the validator let validator = swarm.validators_mut().next().unwrap(); let mut config = validator.config().clone(); config