From af8a09248aeba0ce2c7b2777013dc45513e7c9f8 Mon Sep 17 00:00:00 2001 From: Min Zhang Date: Thu, 22 Sep 2022 00:16:32 -0400 Subject: [PATCH 1/7] draft implementation of flatstoragestate --- chain/chain/src/chain.rs | 134 +++++++----- chain/chain/src/store.rs | 19 ++ core/primitives/src/errors.rs | 2 + core/store/src/flat_state.rs | 400 +++++++++++++++++++++++----------- nearcore/src/runtime/mod.rs | 5 +- tools/mock-node/Cargo.toml | 1 + 6 files changed, 380 insertions(+), 181 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 5acdf0e8d75..bd09fa4b7b2 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -50,7 +50,7 @@ use near_primitives::views::{ }; #[cfg(feature = "protocol_feature_flat_state")] use near_store::flat_state; -use near_store::{DBCol, ShardTries, StoreUpdate}; +use near_store::{DBCol, ShardTries, StoreUpdate, WrappedTrieChanges}; use crate::block_processing_utils::{ BlockPreprocessInfo, BlockProcessingArtifact, BlocksInProcessing, DoneApplyChunkCallback, @@ -80,9 +80,8 @@ use near_primitives::shard_layout::{ account_id_to_shard_id, account_id_to_shard_uid, ShardLayout, ShardUId, }; use near_primitives::version::PROTOCOL_VERSION; -use near_store::flat_state::FlatStateDelta; #[cfg(feature = "protocol_feature_flat_state")] -use near_store::flat_state::FlatStorageState; +use near_store::flat_state::{FlatStateDelta, FlatStorageState}; use once_cell::sync::OnceCell; use rayon::iter::{IntoParallelIterator, ParallelIterator}; @@ -628,7 +627,7 @@ impl Chain { &mut tmp_store_update, shard_id, block_head.last_block_hash, - &FlatStateDelta::new(), + &FlatStateDelta::default(), )?; } store_update.merge(tmp_store_update); @@ -647,18 +646,9 @@ impl Chain { for shard_id in 0..runtime_adapter.num_shards(&block_head.epoch_id)? { let flat_storage_state = FlatStorageState::new( store.store().clone(), - store.head()?.height, - &|hash| store.get_block_header(hash).unwrap(), - &|height| { - store - .get_all_block_hashes_by_height(height) - .unwrap() - .values() - .flatten() - .copied() - .collect::>() - }, shard_id, + store.head().unwrap().height, + &store, ); runtime_adapter.add_flat_storage_state_for_shard(shard_id, flat_storage_state); } @@ -2085,6 +2075,41 @@ impl Chain { chain_update.postprocess_block(me, &block, block_preprocess_info, apply_results)?; chain_update.commit()?; + // Update flat storage head to be the last final block. Note that this update happens + // in a separate db transaction from the update from block processing. This is intentional + // because flat_storage_state need to be locked during the update of flat head, otherwise + // flat_storage_state is in an inconsistent state that could be accessed by the other + // apply chunks processes. This means, the flat head is not always the same as + // the last final block on chain, which is OK, because in the flat storage implementation + // we don't assume that. + // Also note, for now, we only update flat storage for the shards that we care about in this epoch + // TODO (#7327): support flat storage for state sync and block catchups + for shard_id in 0..self.runtime_adapter.num_shards(block.header().epoch_id())? { + if self.runtime_adapter.cares_about_shard( + None, + block.header().prev_hash(), + shard_id, + true, + ) { + if let Some(flat_storage_state) = + self.runtime_adapter.get_flat_storage_state_for_shard(shard_id) + { + let new_flat_head = block.header().last_final_block(); + flat_storage_state.update_flat_head(new_flat_head).unwrap_or_else(|_| { + panic!( + "Cannot update flat head from {:?} to {:?}", + flat_storage_state.get_flat_head(), + new_flat_head + ) + }); + } else { + // TODO (#7327): some error handling code here. Should probably return an error (or panic?) + // here if the flat storage doesn't exist. We don't do that yet because + // flat storage is not fully enabled yet. + } + } + } + self.pending_state_patch.clear(); if let Some(tip) = &new_head { @@ -3161,7 +3186,7 @@ impl Chain { let block = self.store.get_block(block_hash)?; let prev_block = self.store.get_block(block.header().prev_hash())?; let mut chain_update = self.chain_update(); - chain_update.apply_chunk_postprocessing(&block, &prev_block, results, true)?; + chain_update.apply_chunk_postprocessing(&block, &prev_block, results)?; chain_update.commit()?; Ok(()) } @@ -3603,7 +3628,10 @@ impl Chain { &transaction.transaction.block_hash, transaction_validity_period, ) - .map_err(|_| Error::from(Error::InvalidTransactions))?; + .map_err(|_| { + tracing::warn!("Invalid Transactions for mock node"); + Error::from(Error::InvalidTransactions) + })?; } }; @@ -4490,15 +4518,14 @@ impl<'a> ChainUpdate<'a> { block: &Block, prev_block: &Block, apply_results: Vec>, - is_catching_up: bool, ) -> Result<(), Error> { let _span = tracing::debug_span!(target: "chain", "apply_chunk_postprocessing").entered(); for result in apply_results { self.process_apply_chunk_result( result?, *block.hash(), + block.header().height(), *prev_block.hash(), - is_catching_up, )? } Ok(()) @@ -4630,13 +4657,36 @@ impl<'a> ChainUpdate<'a> { Ok(()) } + #[allow(unused_variables)] + fn save_flat_state_changes( + &mut self, + block_hash: CryptoHash, + prev_hash: CryptoHash, + height: BlockHeight, + shard_id: ShardId, + trie_changes: &WrappedTrieChanges, + ) -> Result<(), Error> { + #[cfg(feature = "protocol_feature_flat_state")] + if self.runtime_adapter.cares_about_shard(None, &block_hash, shard_id, true) { + if let Some(chain_flat_storage) = + self.runtime_adapter.get_flat_storage_state_for_shard(shard_id) + { + let delta = FlatStateDelta::from_state_changes(&trie_changes.state_changes()); + let block_info = flat_state::BlockInfo { hash: block_hash, height, prev_hash }; + let store_update = chain_flat_storage.add_block(&block_hash, delta, block_info)?; + self.chain_store_update.merge(store_update); + } + } + Ok(()) + } + /// Processed results of applying chunk fn process_apply_chunk_result( &mut self, result: ApplyChunkResult, block_hash: CryptoHash, + height: BlockHeight, prev_block_hash: CryptoHash, - is_catching_up: bool, ) -> Result<(), Error> { match result { ApplyChunkResult::SameHeight(SameHeightResult { @@ -4664,17 +4714,13 @@ impl<'a> ChainUpdate<'a> { ); // Right now, we don't implement flat storage for catchup, so we only store // the delta if we are not catching up - if !is_catching_up { - if let Some(chain_flat_storage) = - self.runtime_adapter.get_flat_storage_state_for_shard(shard_id) - { - let delta = FlatStateDelta::from_state_changes( - &apply_result.trie_changes.state_changes(), - ); - let store_update = chain_flat_storage.add_delta(&block_hash, delta)?; - self.chain_store_update.merge(store_update); - } - } + self.save_flat_state_changes( + block_hash, + prev_block_hash, + height, + shard_id, + &apply_result.trie_changes, + )?; self.chain_store_update.save_trie_changes(apply_result.trie_changes); self.chain_store_update.save_outgoing_receipt( &block_hash, @@ -4708,6 +4754,13 @@ impl<'a> ChainUpdate<'a> { let mut new_extra = ChunkExtra::clone(&old_extra); *new_extra.state_root_mut() = apply_result.new_root; + self.save_flat_state_changes( + block_hash, + prev_block_hash, + height, + shard_uid.shard_id(), + &apply_result.trie_changes, + )?; self.chain_store_update.save_chunk_extra(&block_hash, &shard_uid, new_extra); self.chain_store_update.save_trie_changes(apply_result.trie_changes); @@ -4747,7 +4800,7 @@ impl<'a> ChainUpdate<'a> { ) -> Result, Error> { let prev_hash = block.header().prev_hash(); let prev_block = self.chain_store_update.get_block(prev_hash)?; - self.apply_chunk_postprocessing(block, &prev_block, apply_chunks_results, false)?; + self.apply_chunk_postprocessing(block, &prev_block, apply_chunks_results)?; let BlockPreprocessInfo { is_caught_up, @@ -4830,23 +4883,6 @@ impl<'a> ChainUpdate<'a> { } } } - - // TODO: fill in the logic here for which shards we need to update flat storage for - // only need shards that we have processed in this block (not the shards to catch up) - let flat_storage_shards = vec![]; - for shard_id in flat_storage_shards { - if let Some(_flat_storage_state) = - self.runtime_adapter.get_flat_storage_state_for_shard(shard_id) - { - // TODO: fill in the correct new head - // if let Some(_update) = flat_storage_state.update_head(&CryptoHash::default()) { - // TODO: add this update to chain update - // } - } else { - // TODO: some error handling code here. Should probably return an error (or panic?) - // here if the flat storage doesn't exist. - } - } Ok(res) } diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 8cd7ef9df38..7a1e2ec0337 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -42,6 +42,8 @@ use near_store::{ use crate::types::{Block, BlockHeader, LatestKnown}; use crate::{byzantine_assert, RuntimeAdapter}; use near_store::db::StoreStatistics; +#[cfg(feature = "protocol_feature_flat_state")] +use near_store::flat_state::{BlockInfo, ChainAccessForFlatStorage}; use std::sync::Arc; /// lru cache size @@ -1118,6 +1120,23 @@ impl ChainStoreAccess for ChainStore { } } +#[cfg(feature = "protocol_feature_flat_state")] +impl ChainAccessForFlatStorage for ChainStore { + fn get_block_info(&self, block_hash: &CryptoHash) -> BlockInfo { + let header = self.get_block_header(block_hash).unwrap(); + BlockInfo { hash: *block_hash, height: header.height(), prev_hash: *header.prev_hash() } + } + + fn get_block_hashes_at_height(&self, height: BlockHeight) -> HashSet { + self.get_all_block_hashes_by_height(height) + .unwrap() + .values() + .flatten() + .copied() + .collect::>() + } +} + /// Cache update for ChainStore #[derive(Default)] struct ChainStoreCacheUpdate { diff --git a/core/primitives/src/errors.rs b/core/primitives/src/errors.rs index 260bc56085a..5606566c428 100644 --- a/core/primitives/src/errors.rs +++ b/core/primitives/src/errors.rs @@ -84,6 +84,8 @@ pub enum StorageError { /// panic in every place that produces this error. /// We can check if db is corrupted by verifying everything in the state trie. StorageInconsistentState(String), + /// Error from flat storage + FlatStorageError(String), } impl std::fmt::Display for StorageError { diff --git a/core/store/src/flat_state.rs b/core/store/src/flat_state.rs index 284f8906c74..bdfa8c2caf3 100644 --- a/core/store/src/flat_state.rs +++ b/core/store/src/flat_state.rs @@ -30,9 +30,20 @@ const POISONED_LOCK_ERR: &str = "The lock was poisoned."; #[cfg(feature = "protocol_feature_flat_state")] const BORSH_ERR: &str = "Borsh cannot fail"; +#[derive(strum::AsRefStr, Debug)] +pub enum FlatStorageError { + BlockNotFound, +} + +impl From for StorageError { + fn from(err: FlatStorageError) -> Self { + StorageError::FlatStorageError(err.as_ref().to_string()) + } +} + #[cfg(feature = "protocol_feature_flat_state")] mod imp { - use crate::flat_state::{FlatStorageState, POISONED_LOCK_ERR}; + use crate::flat_state::{store_helper, FlatStorageState, POISONED_LOCK_ERR}; use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; use near_primitives::state::ValueRef; @@ -95,16 +106,7 @@ mod imp { }; } - let raw_ref = self - .store - .get(crate::DBCol::FlatState, key) - .map_err(|_| StorageError::StorageInternalError); - match raw_ref? { - Some(bytes) => ValueRef::decode(&bytes) - .map(Some) - .map_err(|_| StorageError::StorageInternalError), - None => Ok(None), - } + store_helper::get_state(&self.store, key) } } @@ -285,22 +287,24 @@ pub struct KeyForFlatStateDelta { /// Delta of the state for some shard and block, stores mapping from keys to value refs or None, if key was removed in /// this block. -#[derive(BorshSerialize, BorshDeserialize)] -pub struct FlatStateDelta(pub HashMap, Option>); +#[derive(BorshSerialize, BorshDeserialize, Default, Debug)] +pub struct FlatStateDelta(HashMap, Option>); -impl FlatStateDelta { - pub fn new() -> Self { - Self(HashMap::new()) +impl From<[(Vec, Option); N]> for FlatStateDelta { + fn from(arr: [(Vec, Option); N]) -> Self { + Self(HashMap::from(arr)) } +} +impl FlatStateDelta { /// Returns `Some(Option)` from delta for the given key. If key is not present, returns None. pub fn get(&self, key: &[u8]) -> Option> { self.0.get(key).cloned() } /// Merge two deltas. Values from `other` should override values from `self`. - pub fn merge(&mut self, other: Self) { - self.0.extend(other.0) + pub fn merge(&mut self, other: &Self) { + self.0.extend(other.0.iter().map(|(k, v)| (k.clone(), v.clone()))) } /// Creates delta using raw state changes for some block. @@ -333,16 +337,7 @@ impl FlatStateDelta { #[cfg(feature = "protocol_feature_flat_state")] pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate) { for (key, value) in self.0.into_iter() { - match value { - Some(value) => { - store_update - .set_ser(crate::DBCol::FlatState, &key, &value) - .expect("Borsh cannot fail"); - } - None => { - store_update.delete(crate::DBCol::FlatState, &key); - } - } + store_helper::set_state(store_update, key, value).expect(BORSH_ERR); } } @@ -350,8 +345,7 @@ impl FlatStateDelta { pub fn apply_to_flat_state(self, _store_update: &mut StoreUpdate) {} } -#[cfg(feature = "protocol_feature_flat_state")] -use near_primitives::block_header::BlockHeader; +use near_primitives::errors::StorageError; use std::sync::{Arc, RwLock}; /// FlatStorageState stores information on which blocks flat storage current supports key lookups on. @@ -392,7 +386,7 @@ struct FlatStorageStateInner { /// State deltas for all blocks supported by this flat storage. /// All these deltas here are stored on disk too. #[allow(unused)] - deltas: HashMap, + deltas: HashMap>, } #[cfg(feature = "protocol_feature_flat_state")] @@ -400,18 +394,22 @@ pub mod store_helper { use crate::flat_state::KeyForFlatStateDelta; use crate::{FlatStateDelta, Store, StoreUpdate}; use borsh::BorshSerialize; + use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; + use near_primitives::state::ValueRef; use near_primitives::types::ShardId; + use std::sync::Arc; pub(crate) fn get_delta( store: &Store, shard_id: ShardId, block_hash: CryptoHash, - ) -> Result, crate::StorageError> { + ) -> Result>, StorageError> { let key = KeyForFlatStateDelta { shard_id, block_hash }; - store - .get_ser(crate::DBCol::FlatStateDeltas, &key.try_to_vec().unwrap()) - .map_err(|_| crate::StorageError::StorageInternalError) + Ok(store + .get_ser::(crate::DBCol::FlatStateDeltas, &key.try_to_vec().unwrap()) + .map_err(|_| StorageError::StorageInternalError)? + .map(|delta| Arc::new(delta))) } pub fn set_delta( @@ -419,11 +417,11 @@ pub mod store_helper { shard_id: ShardId, block_hash: CryptoHash, delta: &FlatStateDelta, - ) -> Result<(), crate::StorageError> { + ) -> Result<(), StorageError> { let key = KeyForFlatStateDelta { shard_id, block_hash }; store_update .set_ser(crate::DBCol::FlatStateDeltas, &key.try_to_vec().unwrap(), delta) - .map_err(|_| crate::StorageError::StorageInternalError) + .map_err(|_| StorageError::StorageInternalError) } pub(crate) fn get_flat_head(store: &Store, shard_id: ShardId) -> CryptoHash { @@ -438,44 +436,110 @@ pub mod store_helper { .set_ser(crate::DBCol::FlatStateMisc, &shard_id.try_to_vec().unwrap(), val) .expect("Error writing flat head from storage") } + + pub(crate) fn get_state(store: &Store, key: &[u8]) -> Result, StorageError> { + let raw_ref = + store.get(crate::DBCol::FlatState, key).map_err(|_| StorageError::StorageInternalError); + match raw_ref? { + Some(bytes) => { + ValueRef::decode(&bytes).map(Some).map_err(|_| StorageError::StorageInternalError) + } + None => Ok(None), + } + } + + pub(crate) fn set_state( + store_update: &mut StoreUpdate, + key: Vec, + value: Option, + ) -> Result<(), StorageError> { + match value { + Some(value) => store_update + .set_ser(crate::DBCol::FlatState, &key, &value) + .map_err(|_| StorageError::StorageInternalError), + None => Ok(store_update.delete(crate::DBCol::FlatState, &key)), + } + } +} + +// Unfortunately we don't have access to ChainStore inside this file because of package +// dependencies, so we create this trait that provides the functions that FlatStorageState needs +// to access chain information +#[cfg(feature = "protocol_feature_flat_state")] +pub trait ChainAccessForFlatStorage { + fn get_block_info(&self, block_hash: &CryptoHash) -> BlockInfo; + fn get_block_hashes_at_height(&self, block_height: BlockHeight) -> HashSet; +} + +#[cfg(feature = "protocol_feature_flat_state")] +impl FlatStorageStateInner { + fn get_deltas_between_blocks( + &self, + target_block_hash: &CryptoHash, + ) -> Result>, FlatStorageError> { + let flat_state_head: CryptoHash = self.flat_head; + let flat_head_info = self.blocks.get(&flat_state_head).unwrap(); + + let mut block_hash = target_block_hash.clone(); + let mut deltas = vec![]; + while block_hash != flat_state_head { + let block_info = self.blocks.get(&block_hash).ok_or(FlatStorageError::BlockNotFound)?; + + if block_info.height < flat_head_info.height { + return Err(FlatStorageError::BlockNotFound); + } + + let delta = self + .deltas + .get(&block_hash) + .unwrap_or_else(|| panic!("block delta for {:?} is not available", block_hash)); + deltas.push(delta.clone()); + + block_hash = block_info.prev_hash; + } + + Ok(deltas) + } } impl FlatStorageState { /// Create a new FlatStorageState for `shard_id`. /// Flat head is initialized to be what is stored on storage. - /// We also load all blocks with height between flat head to the current chain head, + /// We also load all blocks with height between flat head to `latest_block_height` /// including those on forks into the returned FlatStorageState. #[cfg(feature = "protocol_feature_flat_state")] pub fn new( store: Store, - chain_head_height: BlockHeight, + shard_id: ShardId, + latest_block_height: BlockHeight, // Unfortunately we don't have access to ChainStore inside this file because of package // dependencies, so we pass these functions in to access chain info - hash_to_header: &dyn Fn(&CryptoHash) -> BlockHeader, - height_to_hashes: &dyn Fn(BlockHeight) -> HashSet, - shard_id: ShardId, + chain_access: &dyn ChainAccessForFlatStorage, ) -> Self { let flat_head = store_helper::get_flat_head(&store, shard_id); - let header = hash_to_header(&flat_head); - let flat_head_height = header.height(); + let flat_head_info = chain_access.get_block_info(&flat_head); + let flat_head_height = flat_head_info.height; let mut blocks = HashMap::from([( flat_head, - BlockInfo { hash: flat_head, height: flat_head_height, prev_hash: *header.prev_hash() }, + BlockInfo { + hash: flat_head, + height: flat_head_height, + prev_hash: flat_head_info.prev_hash, + }, )]); let mut deltas = HashMap::new(); - for height in flat_head_height + 1..=chain_head_height { - for hash in height_to_hashes(height) { - let header = hash_to_header(&hash); - let prev_hash = *header.prev_hash(); + for height in flat_head_height + 1..=latest_block_height { + for hash in chain_access.get_block_hashes_at_height(height) { + let block_info = chain_access.get_block_info(&hash); assert!( - blocks.contains_key(&prev_hash), + blocks.contains_key(&block_info.prev_hash), "Can't find a path from the current flat storage root {:?}@{} to block {:?}@{}", flat_head, flat_head_height, hash, - header.height() + block_info.height ); - blocks.insert(hash, BlockInfo { hash, height: header.height(), prev_hash }); + blocks.insert(hash, block_info); } } for hash in blocks.keys() { @@ -497,76 +561,14 @@ impl FlatStorageState { } /// Get deltas for blocks, ordered from `self.block_hash` to flat state head (backwards chain order). - /// If sequence of deltas contains final block, head is moved to this block and all deltas until new head are - /// applied to flat state. - // TODO (#7327): move updating flat state head to block postprocessing. - // TODO (#7327): come up how the flat state head and tail should be positioned. // TODO (#7327): implement garbage collection of old deltas. - // TODO (#7327): cache deltas to speed up multiple DB reads. #[cfg(feature = "protocol_feature_flat_state")] fn get_deltas_between_blocks( &self, target_block_hash: &CryptoHash, - ) -> Result, crate::StorageError> { + ) -> Result>, FlatStorageError> { let guard = self.0.write().expect(POISONED_LOCK_ERR); - let flat_state_head = store_helper::get_flat_head(&guard.store, guard.shard_id); - - let block_header: BlockHeader = guard - .store - .get_ser(crate::DBCol::BlockHeader, target_block_hash.as_ref()) - .map_err(|_| crate::StorageError::StorageInternalError)? - .unwrap(); - let final_block_hash = block_header.last_final_block().clone(); - - let mut block_hash = target_block_hash.clone(); - let mut deltas = vec![]; - let mut deltas_to_apply = vec![]; - let mut found_final_block = false; - while block_hash != flat_state_head { - if block_hash == final_block_hash { - assert!(!found_final_block); - found_final_block = true; - } - - let delta = store_helper::get_delta(&guard.store, guard.shard_id, block_hash)?; - match delta { - Some(delta) => { - if found_final_block { - deltas_to_apply.push(delta); - } else { - deltas.push(delta); - } - } - None => {} - } - - let block_header: BlockHeader = guard - .store - .get_ser(crate::DBCol::BlockHeader, block_hash.as_ref()) - .map_err(|_| crate::StorageError::StorageInternalError)? - .unwrap(); - block_hash = block_header.prev_hash().clone(); - } - - let storage = guard.store.storage.clone(); - std::mem::drop(guard); - - if found_final_block { - let mut store_update = StoreUpdate::new(storage); - let mut delta = FlatStateDelta::new(); - for new_delta in deltas_to_apply.drain(..).rev() { - delta.merge(new_delta); - } - delta.apply_to_flat_state(&mut store_update); - match self.update_flat_head(&final_block_hash) { - Some(new_store_update) => { - store_update.merge(new_store_update); - } - None => {} - }; - store_update.commit().map_err(|_| crate::StorageError::StorageInternalError)? - } - Ok(deltas) + guard.get_deltas_between_blocks(target_block_hash) } #[cfg(not(feature = "protocol_feature_flat_state"))] @@ -578,14 +580,23 @@ impl FlatStorageState { Ok(vec![]) } - // Update the head of the flat storage, this might require updating the flat state stored on disk. - // Returns a StoreUpdate for the disk update if there is any + // Update the head of the flat storage, including updating the flat state in memory and on disk + // and updating the flat state to reflect the state at the new head #[cfg(feature = "protocol_feature_flat_state")] - pub fn update_flat_head(&self, new_head: &CryptoHash) -> Option { - let guard = self.0.write().expect(POISONED_LOCK_ERR); + pub fn update_flat_head(&self, new_head: &CryptoHash) -> Result<(), FlatStorageError> { + let mut guard = self.0.write().expect(POISONED_LOCK_ERR); + let deltas = guard.get_deltas_between_blocks(new_head)?; + let mut merged_delta = FlatStateDelta::default(); + for delta in deltas.into_iter().rev() { + merged_delta.merge(delta.as_ref()); + } + + guard.flat_head = *new_head; let mut store_update = StoreUpdate::new(guard.store.storage.clone()); store_helper::set_flat_head(&mut store_update, guard.shard_id, new_head); - Some(store_update) + merged_delta.apply_to_flat_state(&mut store_update); + store_update.commit().expect(BORSH_ERR); + Ok(()) } #[cfg(not(feature = "protocol_feature_flat_state"))] @@ -593,37 +604,117 @@ impl FlatStorageState { None } - /// Adds a delta for a block to flat storage, returns a StoreUpdate. + /// Adds a block (including the block delta and block info) to flat storage, + /// returns a StoreUpdate because we also stores the delta on disk #[cfg(feature = "protocol_feature_flat_state")] - pub fn add_delta( + pub fn add_block( &self, block_hash: &CryptoHash, delta: FlatStateDelta, + block: BlockInfo, ) -> Result { - let guard = self.0.write().expect(POISONED_LOCK_ERR); + let mut guard = self.0.write().expect(POISONED_LOCK_ERR); let mut store_update = StoreUpdate::new(guard.store.storage.clone()); store_helper::set_delta(&mut store_update, guard.shard_id, block_hash.clone(), &delta)?; + guard.deltas.insert(*block_hash, Arc::new(delta)); + guard.blocks.insert(*block_hash, block); Ok(store_update) } #[cfg(not(feature = "protocol_feature_flat_state"))] - pub fn add_delta( + pub fn add_block( &self, _block_hash: &CryptoHash, _delta: FlatStateDelta, + _block_info: BlockInfo, ) -> Result { panic!("not implemented") } + + #[cfg(feature = "protocol_feature_flat_state")] + pub fn get_flat_head(&self) -> CryptoHash { + let guard = self.0.read().expect(POISONED_LOCK_ERR); + guard.flat_head + } + + #[cfg(not(feature = "protocol_feature_flat_state"))] + pub fn get_flat_head(&self) -> CryptoHash { + CryptoHash::default() + } } #[cfg(test)] +#[cfg(feature = "protocol_feature_flat_state")] mod tests { + use crate::flat_state::{store_helper, BlockInfo, ChainAccessForFlatStorage, FlatStorageState}; + use crate::test_utils::create_test_store; use crate::FlatStateDelta; + use borsh::BorshSerialize; + use near_primitives::borsh::maybestd::collections::HashSet; + use near_primitives::hash::{hash, CryptoHash}; use near_primitives::state::ValueRef; use near_primitives::trie_key::TrieKey; - use near_primitives::types::{RawStateChange, RawStateChangesWithTrieKey, StateChangeCause}; + use near_primitives::types::{ + BlockHeight, RawStateChange, RawStateChangesWithTrieKey, StateChangeCause, + }; use std::collections::HashMap; + struct MockChain { + height_to_hashes: HashMap, + blocks: HashMap, + head_height: BlockHeight, + } + + impl ChainAccessForFlatStorage for MockChain { + fn get_block_info(&self, block_hash: &CryptoHash) -> BlockInfo { + self.blocks.get(block_hash).unwrap().clone() + } + + fn get_block_hashes_at_height(&self, block_height: BlockHeight) -> HashSet { + HashSet::from([self.get_block_hash(block_height)]) + } + } + + impl MockChain { + fn block_hash(height: BlockHeight) -> CryptoHash { + hash(&height.try_to_vec().unwrap()) + } + + // create a chain with no forks with length n + fn linear_chain(n: usize) -> MockChain { + let hashes: Vec<_> = (0..n).map(|i| MockChain::block_hash(i as BlockHeight)).collect(); + let height_to_hashes: HashMap<_, _> = + hashes.iter().enumerate().map(|(k, v)| (k as BlockHeight, *v)).collect(); + let blocks = (0..n) + .map(|i| { + let prev_hash = if i == 0 { CryptoHash::default() } else { hashes[i - 1] }; + (hashes[i], BlockInfo { hash: hashes[i], height: i as BlockHeight, prev_hash }) + }) + .collect(); + MockChain { height_to_hashes, blocks, head_height: n as BlockHeight - 1 } + } + + fn get_block_hash(&self, height: BlockHeight) -> CryptoHash { + *self.height_to_hashes.get(&height).unwrap() + } + + /// create a new block on top the current chain head, return the new block hash + fn create_block(&mut self) -> CryptoHash { + let hash = MockChain::block_hash(self.head_height + 1); + self.height_to_hashes.insert(self.head_height + 1, hash); + self.blocks.insert( + hash, + BlockInfo { + hash, + height: self.head_height + 1, + prev_hash: self.get_block_hash(self.head_height), + }, + ); + self.head_height += 1; + hash + } + } + /// Check correctness of creating `FlatStateDelta` from state changes. #[test] fn flat_state_delta_creation() { @@ -705,19 +796,19 @@ mod tests { /// different keys. #[test] fn flat_state_delta_merge() { - let mut delta = FlatStateDelta(HashMap::from([ + let mut delta = FlatStateDelta::from([ (vec![1], Some(ValueRef::new(&[4]))), (vec![2], Some(ValueRef::new(&[5]))), (vec![3], None), (vec![4], Some(ValueRef::new(&[6]))), - ])); - let delta_new = FlatStateDelta(HashMap::from([ + ]); + let delta_new = FlatStateDelta::from([ (vec![2], Some(ValueRef::new(&[7]))), (vec![3], Some(ValueRef::new(&[8]))), (vec![4], None), (vec![5], Some(ValueRef::new(&[9]))), - ])); - delta.merge(delta_new); + ]); + delta.merge(&delta_new); assert_eq!(delta.get(&[1]), Some(Some(ValueRef::new(&[4])))); assert_eq!(delta.get(&[2]), Some(Some(ValueRef::new(&[7])))); @@ -726,6 +817,53 @@ mod tests { assert_eq!(delta.get(&[5]), Some(Some(ValueRef::new(&[9])))); } + #[test] + fn flat_storage_state_sanity() { + let mut chain = MockChain::linear_chain(10); + let store = create_test_store(); + let mut store_update = store.store_update(); + store_helper::set_flat_head(&mut store_update, 0, &chain.get_block_hash(0)); + for i in 0..10 { + store_helper::set_delta( + &mut store_update, + 0, + chain.get_block_hash(i), + &FlatStateDelta::from([(vec![1], Some(ValueRef::new(&[i as u8])))]), + ) + .unwrap(); + } + store_update.commit().unwrap(); + + let flat_storage_state = FlatStorageState::new(store.clone(), 0, 9, &chain); + // TODO: create a flat state and test the flat state + for i in 0..10 { + let deltas = + flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(i)).unwrap(); + assert_eq!(deltas.len(), i as usize); + } + let hash = chain.create_block(); + let store_update = flat_storage_state + .add_block( + &hash, + FlatStateDelta::from([(vec![1], None), (vec![2], Some(ValueRef::new(&[1])))]), + chain.get_block_info(&hash), + ) + .unwrap(); + store_update.commit().unwrap(); + let deltas = + flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(10)).unwrap(); + assert_eq!(deltas.len(), 10); + flat_storage_state.update_flat_head(&chain.get_block_hash(5)).unwrap(); + assert_eq!(store_helper::get_state(&store, &[1]).unwrap(), Some(ValueRef::new(&[5]))); + + let deltas = + flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(10)).unwrap(); + assert_eq!(deltas.len(), 5); + flat_storage_state.update_flat_head(&chain.get_block_hash(10)).unwrap(); + assert_eq!(store_helper::get_state(&store, &[1]).unwrap(), None); + assert_eq!(store_helper::get_state(&store, &[2]).unwrap(), Some(ValueRef::new(&[1]))); + } + #[test] fn flat_state_apply_single_delta() { // TODO (#7327): check this scenario after implementing flat storage state: diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index d35d34c2d5c..407bf6d9870 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -519,7 +519,10 @@ impl NightshadeRuntime { state_patch, ) .map_err(|e| match e { - RuntimeError::InvalidTxError(_) => Error::InvalidTransactions, + RuntimeError::InvalidTxError(err) => { + tracing::warn!("Invalid tx {:?}", err); + Error::InvalidTransactions + } // TODO(#2152): process gracefully RuntimeError::BalanceMismatchError(e) => panic!("{}", e), // TODO(#2152): process gracefully diff --git a/tools/mock-node/Cargo.toml b/tools/mock-node/Cargo.toml index 5f099fb4ace..e856dc95b5d 100644 --- a/tools/mock-node/Cargo.toml +++ b/tools/mock-node/Cargo.toml @@ -56,4 +56,5 @@ required-features = ["mock_node"] [features] test_features = ["nearcore/test_features"] +protocol_feature_flat_state = ["nearcore/protocol_feature_flat_state"] mock_node = ["near-chain/mock_node", "near-epoch-manager/mock_node"] From 87a3afec35dadb9e1d02992f81d9fcd689c6aaca Mon Sep 17 00:00:00 2001 From: Min Zhang Date: Thu, 22 Sep 2022 14:05:33 -0400 Subject: [PATCH 2/7] fix a compilation error --- core/store/src/flat_state.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/store/src/flat_state.rs b/core/store/src/flat_state.rs index bdfa8c2caf3..f2e91f4b313 100644 --- a/core/store/src/flat_state.rs +++ b/core/store/src/flat_state.rs @@ -351,7 +351,7 @@ use std::sync::{Arc, RwLock}; /// FlatStorageState stores information on which blocks flat storage current supports key lookups on. /// Note that this struct is shared by multiple threads, the chain thread, threads that apply chunks, /// and view client, so the implementation here must be thread safe and must have interior mutability, -/// thus all methods in this class are with &self intead of &mut self. +/// thus all methods in this class are with &self instead of &mut self. #[derive(Clone)] pub struct FlatStorageState(Arc>); @@ -600,8 +600,8 @@ impl FlatStorageState { } #[cfg(not(feature = "protocol_feature_flat_state"))] - pub fn update_flat_head(&self, _new_head: &CryptoHash) -> Option { - None + pub fn update_flat_head(&self, _new_head: &CryptoHash) -> Result<(), FlatStorageError> { + Ok(()) } /// Adds a block (including the block delta and block info) to flat storage, From 329576aa66cd5e410b8393183091caac3d5728fa Mon Sep 17 00:00:00 2001 From: Min Zhang Date: Thu, 22 Sep 2022 16:42:26 -0400 Subject: [PATCH 3/7] fix tests --- chain/chain/src/chain.rs | 32 +++++-- core/store/src/flat_state.rs | 83 +++++++++++++------ core/store/src/trie/mod.rs | 2 +- .../src/tests/client/process_blocks.rs | 2 + nearcore/src/runtime/mod.rs | 7 +- 5 files changed, 89 insertions(+), 37 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index bd09fa4b7b2..e902cf76c47 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -21,6 +21,7 @@ use near_primitives::challenge::{ MaybeEncodedShardChunk, PartialState, SlashedValidator, }; use near_primitives::checked_feature; +use near_primitives::errors::StorageError; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::{ combine_hash, merklize, verify_path, Direction, MerklePath, MerklePathItem, PartialMerkleTree, @@ -628,7 +629,8 @@ impl Chain { shard_id, block_head.last_block_hash, &FlatStateDelta::default(), - )?; + ) + .map_err(|e| StorageError::from(e))?; } store_update.merge(tmp_store_update); } @@ -2086,7 +2088,7 @@ impl Chain { // TODO (#7327): support flat storage for state sync and block catchups for shard_id in 0..self.runtime_adapter.num_shards(block.header().epoch_id())? { if self.runtime_adapter.cares_about_shard( - None, + me.as_ref(), block.header().prev_hash(), shard_id, true, @@ -2094,8 +2096,11 @@ impl Chain { if let Some(flat_storage_state) = self.runtime_adapter.get_flat_storage_state_for_shard(shard_id) { - let new_flat_head = block.header().last_final_block(); - flat_storage_state.update_flat_head(new_flat_head).unwrap_or_else(|_| { + let mut new_flat_head = *block.header().last_final_block(); + if new_flat_head == CryptoHash::default() { + new_flat_head = *self.genesis.hash(); + } + flat_storage_state.update_flat_head(&new_flat_head).unwrap_or_else(|_| { panic!( "Cannot update flat head from {:?} to {:?}", flat_storage_state.get_flat_head(), @@ -3127,7 +3132,7 @@ impl Chain { if self.blocks_in_processing.has_blocks_to_catch_up(&queued_block) { processed_blocks.insert(queued_block, results); } else { - match self.block_catch_up_postprocess(&queued_block, results) { + match self.block_catch_up_postprocess(me, &queued_block, results) { Ok(_) => { let mut saw_one = false; for next_block_hash in @@ -3180,13 +3185,14 @@ impl Chain { fn block_catch_up_postprocess( &mut self, + me: &Option, block_hash: &CryptoHash, results: Vec>, ) -> Result<(), Error> { let block = self.store.get_block(block_hash)?; let prev_block = self.store.get_block(block.header().prev_hash())?; let mut chain_update = self.chain_update(); - chain_update.apply_chunk_postprocessing(&block, &prev_block, results)?; + chain_update.apply_chunk_postprocessing(me, &block, &prev_block, results)?; chain_update.commit()?; Ok(()) } @@ -4515,6 +4521,7 @@ impl<'a> ChainUpdate<'a> { fn apply_chunk_postprocessing( &mut self, + me: &Option, block: &Block, prev_block: &Block, apply_results: Vec>, @@ -4522,6 +4529,7 @@ impl<'a> ChainUpdate<'a> { let _span = tracing::debug_span!(target: "chain", "apply_chunk_postprocessing").entered(); for result in apply_results { self.process_apply_chunk_result( + me, result?, *block.hash(), block.header().height(), @@ -4660,6 +4668,7 @@ impl<'a> ChainUpdate<'a> { #[allow(unused_variables)] fn save_flat_state_changes( &mut self, + me: &Option, block_hash: CryptoHash, prev_hash: CryptoHash, height: BlockHeight, @@ -4667,13 +4676,15 @@ impl<'a> ChainUpdate<'a> { trie_changes: &WrappedTrieChanges, ) -> Result<(), Error> { #[cfg(feature = "protocol_feature_flat_state")] - if self.runtime_adapter.cares_about_shard(None, &block_hash, shard_id, true) { + if self.runtime_adapter.cares_about_shard(me.as_ref(), &prev_hash, shard_id, true) { if let Some(chain_flat_storage) = self.runtime_adapter.get_flat_storage_state_for_shard(shard_id) { let delta = FlatStateDelta::from_state_changes(&trie_changes.state_changes()); let block_info = flat_state::BlockInfo { hash: block_hash, height, prev_hash }; - let store_update = chain_flat_storage.add_block(&block_hash, delta, block_info)?; + let store_update = chain_flat_storage + .add_block(&block_hash, delta, block_info) + .map_err(|e| StorageError::from(e))?; self.chain_store_update.merge(store_update); } } @@ -4683,6 +4694,7 @@ impl<'a> ChainUpdate<'a> { /// Processed results of applying chunk fn process_apply_chunk_result( &mut self, + me: &Option, result: ApplyChunkResult, block_hash: CryptoHash, height: BlockHeight, @@ -4715,6 +4727,7 @@ impl<'a> ChainUpdate<'a> { // Right now, we don't implement flat storage for catchup, so we only store // the delta if we are not catching up self.save_flat_state_changes( + me, block_hash, prev_block_hash, height, @@ -4755,6 +4768,7 @@ impl<'a> ChainUpdate<'a> { *new_extra.state_root_mut() = apply_result.new_root; self.save_flat_state_changes( + me, block_hash, prev_block_hash, height, @@ -4800,7 +4814,7 @@ impl<'a> ChainUpdate<'a> { ) -> Result, Error> { let prev_hash = block.header().prev_hash(); let prev_block = self.chain_store_update.get_block(prev_hash)?; - self.apply_chunk_postprocessing(block, &prev_block, apply_chunks_results)?; + self.apply_chunk_postprocessing(me, block, &prev_block, apply_chunks_results)?; let BlockPreprocessInfo { is_caught_up, diff --git a/core/store/src/flat_state.rs b/core/store/src/flat_state.rs index f2e91f4b313..f3d055a819c 100644 --- a/core/store/src/flat_state.rs +++ b/core/store/src/flat_state.rs @@ -32,19 +32,26 @@ const BORSH_ERR: &str = "Borsh cannot fail"; #[derive(strum::AsRefStr, Debug)] pub enum FlatStorageError { - BlockNotFound, + /// This means we can't find a path from `flat_head` to the block + BlockNotSupported(CryptoHash), + StorageInternalError, } impl From for StorageError { fn from(err: FlatStorageError) -> Self { - StorageError::FlatStorageError(err.as_ref().to_string()) + match err { + FlatStorageError::BlockNotSupported(hash) => StorageError::FlatStorageError(format!( + "FlatStorage does not support this block {:?}", + hash + )), + FlatStorageError::StorageInternalError => StorageError::StorageInternalError, + } } } #[cfg(feature = "protocol_feature_flat_state")] mod imp { - use crate::flat_state::{store_helper, FlatStorageState, POISONED_LOCK_ERR}; - use near_primitives::errors::StorageError; + use crate::flat_state::{store_helper, FlatStorageError, FlatStorageState, POISONED_LOCK_ERR}; use near_primitives::hash::CryptoHash; use near_primitives::state::ValueRef; use near_primitives::types::ShardId; @@ -92,7 +99,7 @@ mod imp { /// could charge users for the value length before loading the value. // TODO (#7327): support different roots (or block hashes). // TODO (#7327): consider inlining small values, so we could use only one db access. - pub fn get_ref(&self, key: &[u8]) -> Result, StorageError> { + pub fn get_ref(&self, key: &[u8]) -> Result, FlatStorageError> { // Take deltas ordered from `self.block_hash` to flat state head. // In other words, order of deltas is the opposite of the order of blocks in chain. let deltas = self.flat_storage_state.get_deltas_between_blocks(&self.block_hash)?; @@ -369,6 +376,19 @@ pub struct BlockInfo { pub prev_hash: CryptoHash, } +// FlatStorageState need to support concurrent access and be consistent if node crashes or restarts, +// so we make sure to keep the following invariants in our implementation. +// - `flat_head` is stored on disk. The value of flat_head in memory and on disk should always +// be consistent with the flat state stored in `DbCol::FlatState` on disk. This means, updates to +// these values much be atomic from the outside. +// - `blocks` and `deltas` store the same set of blocks, which is the set of blocks that +// `FlatStorageState::get_deltas_between_blocks` supports. For any block in `blocks`, `flat_head` +// must be on the same chain as the block and all blocks between `flat_head` and the block must +// also be in `blocks`. +// - All deltas in `deltas` are stored on disk. And if a block is accepted by chain, its deltas +// must be stored on disk as well, if the block is children of `flat_head`. +// This makes sure that when a node restarts, FlatStorageState can load deltas for all blocks +// after the `flat_head` block successfully. struct FlatStorageStateInner { #[allow(unused)] store: Store, @@ -391,10 +411,9 @@ struct FlatStorageStateInner { #[cfg(feature = "protocol_feature_flat_state")] pub mod store_helper { - use crate::flat_state::KeyForFlatStateDelta; + use crate::flat_state::{FlatStorageError, KeyForFlatStateDelta}; use crate::{FlatStateDelta, Store, StoreUpdate}; use borsh::BorshSerialize; - use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; use near_primitives::state::ValueRef; use near_primitives::types::ShardId; @@ -404,11 +423,11 @@ pub mod store_helper { store: &Store, shard_id: ShardId, block_hash: CryptoHash, - ) -> Result>, StorageError> { + ) -> Result>, FlatStorageError> { let key = KeyForFlatStateDelta { shard_id, block_hash }; Ok(store .get_ser::(crate::DBCol::FlatStateDeltas, &key.try_to_vec().unwrap()) - .map_err(|_| StorageError::StorageInternalError)? + .map_err(|_| FlatStorageError::StorageInternalError)? .map(|delta| Arc::new(delta))) } @@ -417,11 +436,11 @@ pub mod store_helper { shard_id: ShardId, block_hash: CryptoHash, delta: &FlatStateDelta, - ) -> Result<(), StorageError> { + ) -> Result<(), FlatStorageError> { let key = KeyForFlatStateDelta { shard_id, block_hash }; store_update .set_ser(crate::DBCol::FlatStateDeltas, &key.try_to_vec().unwrap(), delta) - .map_err(|_| StorageError::StorageInternalError) + .map_err(|_| FlatStorageError::StorageInternalError) } pub(crate) fn get_flat_head(store: &Store, shard_id: ShardId) -> CryptoHash { @@ -437,13 +456,17 @@ pub mod store_helper { .expect("Error writing flat head from storage") } - pub(crate) fn get_state(store: &Store, key: &[u8]) -> Result, StorageError> { - let raw_ref = - store.get(crate::DBCol::FlatState, key).map_err(|_| StorageError::StorageInternalError); + pub(crate) fn get_state( + store: &Store, + key: &[u8], + ) -> Result, FlatStorageError> { + let raw_ref = store + .get(crate::DBCol::FlatState, key) + .map_err(|_| FlatStorageError::StorageInternalError); match raw_ref? { - Some(bytes) => { - ValueRef::decode(&bytes).map(Some).map_err(|_| StorageError::StorageInternalError) - } + Some(bytes) => ValueRef::decode(&bytes) + .map(Some) + .map_err(|_| FlatStorageError::StorageInternalError), None => Ok(None), } } @@ -452,11 +475,11 @@ pub mod store_helper { store_update: &mut StoreUpdate, key: Vec, value: Option, - ) -> Result<(), StorageError> { + ) -> Result<(), FlatStorageError> { match value { Some(value) => store_update .set_ser(crate::DBCol::FlatState, &key, &value) - .map_err(|_| StorageError::StorageInternalError), + .map_err(|_| FlatStorageError::StorageInternalError), None => Ok(store_update.delete(crate::DBCol::FlatState, &key)), } } @@ -483,10 +506,13 @@ impl FlatStorageStateInner { let mut block_hash = target_block_hash.clone(); let mut deltas = vec![]; while block_hash != flat_state_head { - let block_info = self.blocks.get(&block_hash).ok_or(FlatStorageError::BlockNotFound)?; + let block_info = self + .blocks + .get(&block_hash) + .ok_or(FlatStorageError::BlockNotSupported(*target_block_hash))?; if block_info.height < flat_head_info.height { - return Err(FlatStorageError::BlockNotFound); + return Err(FlatStorageError::BlockNotSupported(*target_block_hash)); } let delta = self @@ -533,7 +559,7 @@ impl FlatStorageState { let block_info = chain_access.get_block_info(&hash); assert!( blocks.contains_key(&block_info.prev_hash), - "Can't find a path from the current flat storage root {:?}@{} to block {:?}@{}", + "Can't find a path from the current flat head {:?}@{} to block {:?}@{}", flat_head, flat_head_height, hash, @@ -605,15 +631,22 @@ impl FlatStorageState { } /// Adds a block (including the block delta and block info) to flat storage, - /// returns a StoreUpdate because we also stores the delta on disk + /// returns a StoreUpdate to store the delta on disk. Node that this StoreUpdate should be + /// committed to disk in one db transaction together with the rest of changes caused by block, + /// in case the node stopped or crashed in between and a block is on chain but its delta is not + /// stored or vice versa. #[cfg(feature = "protocol_feature_flat_state")] pub fn add_block( &self, block_hash: &CryptoHash, delta: FlatStateDelta, block: BlockInfo, - ) -> Result { + ) -> Result { let mut guard = self.0.write().expect(POISONED_LOCK_ERR); + tracing::info!(target:"chain", "blocks {:?} prev_hash {:?}", guard.blocks.keys(), block.prev_hash); + if !guard.blocks.contains_key(&block.prev_hash) { + return Err(FlatStorageError::BlockNotSupported(*block_hash)); + } let mut store_update = StoreUpdate::new(guard.store.storage.clone()); store_helper::set_delta(&mut store_update, guard.shard_id, block_hash.clone(), &delta)?; guard.deltas.insert(*block_hash, Arc::new(delta)); @@ -627,7 +660,7 @@ impl FlatStorageState { _block_hash: &CryptoHash, _delta: FlatStateDelta, _block_info: BlockInfo, - ) -> Result { + ) -> Result { panic!("not implemented") } diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index 30627f4e30a..9fcc22703dd 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -876,7 +876,7 @@ impl Trie { pub fn get_ref(&self, key: &[u8]) -> Result, StorageError> { let is_delayed = is_delayed_receipt_key(key); match &self.flat_state { - Some(flat_state) if !is_delayed => flat_state.get_ref(&key), + Some(flat_state) if !is_delayed => flat_state.get_ref(&key).map_err(|e| e.into()), _ => { let key = NibbleSlice::new(key); self.lookup(key) diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index 514e939223d..7bf7381cf9c 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -2385,6 +2385,8 @@ fn test_catchup_gas_price_change() { #[test] fn test_block_execution_outcomes() { + init_test_logger(); + let epoch_length = 5; let min_gas_price = 10000; let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 407bf6d9870..be560df5f08 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -23,7 +23,7 @@ use near_primitives::contract::ContractCode; use near_primitives::epoch_manager::block_info::BlockInfo; use near_primitives::epoch_manager::epoch_info::EpochInfo; use near_primitives::epoch_manager::{EpochConfig, ShardConfig}; -use near_primitives::errors::{EpochError, InvalidTxError, RuntimeError}; +use near_primitives::errors::{EpochError, InvalidTxError, RuntimeError, StorageError}; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::receipt::Receipt; use near_primitives::runtime::config_store::RuntimeConfigStore; @@ -1410,7 +1410,10 @@ impl RuntimeAdapter for NightshadeRuntime { ) { Ok(result) => Ok(result), Err(e) => match e { - Error::StorageError(_) => panic!("{e}"), + Error::StorageError(err) => match &err { + StorageError::FlatStorageError(_) => Err(err.into()), + _ => panic!("{err}"), + }, _ => Err(e), }, } From 1c573b92e1550d62fea562f9ba1c53b89fee5c3c Mon Sep 17 00:00:00 2001 From: Min Zhang Date: Thu, 22 Sep 2022 17:48:55 -0400 Subject: [PATCH 4/7] add comments to test and test flat state as well. Also fixed get_deltas_between_blocks --- chain/chain/src/chain.rs | 1 - core/store/src/flat_state.rs | 99 +++++++++++++++---- core/store/src/trie/mod.rs | 2 +- .../src/tests/client/process_blocks.rs | 2 + 4 files changed, 82 insertions(+), 22 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index e902cf76c47..ff7ea6331d5 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -21,7 +21,6 @@ use near_primitives::challenge::{ MaybeEncodedShardChunk, PartialState, SlashedValidator, }; use near_primitives::checked_feature; -use near_primitives::errors::StorageError; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::{ combine_hash, merklize, verify_path, Direction, MerklePath, MerklePathItem, PartialMerkleTree, diff --git a/core/store/src/flat_state.rs b/core/store/src/flat_state.rs index f3d055a819c..9427b53d37d 100644 --- a/core/store/src/flat_state.rs +++ b/core/store/src/flat_state.rs @@ -51,7 +51,7 @@ impl From for StorageError { #[cfg(feature = "protocol_feature_flat_state")] mod imp { - use crate::flat_state::{store_helper, FlatStorageError, FlatStorageState, POISONED_LOCK_ERR}; + use crate::flat_state::{store_helper, FlatStorageState, POISONED_LOCK_ERR}; use near_primitives::hash::CryptoHash; use near_primitives::state::ValueRef; use near_primitives::types::ShardId; @@ -74,7 +74,8 @@ mod imp { /// It should store all trie keys and values/value refs for the state on top of /// flat_storage_state.head, except for delayed receipt keys. store: Store, - /// The block for which key-value pairs of its state will be retrieved. + /// The block for which key-value pairs of its state will be retrieved. The flat state + /// will reflect the state AFTER the block is applied. block_hash: CryptoHash, /// In-memory cache for the key value pairs stored on disk. #[allow(unused)] @@ -99,7 +100,7 @@ mod imp { /// could charge users for the value length before loading the value. // TODO (#7327): support different roots (or block hashes). // TODO (#7327): consider inlining small values, so we could use only one db access. - pub fn get_ref(&self, key: &[u8]) -> Result, FlatStorageError> { + pub fn get_ref(&self, key: &[u8]) -> Result, crate::StorageError> { // Take deltas ordered from `self.block_hash` to flat state head. // In other words, order of deltas is the opposite of the order of blocks in chain. let deltas = self.flat_storage_state.get_deltas_between_blocks(&self.block_hash)?; @@ -113,7 +114,7 @@ mod imp { }; } - store_helper::get_state(&self.store, key) + Ok(store_helper::get_state(&self.store, key)?) } } @@ -164,9 +165,15 @@ mod imp { assert!(original_value.is_none()); } - /// Creates `FlatState` for accessing flat storage data for particular shard and the given block. - /// If flat state feature was not enabled (see parallel implementation below), request was made from view client - /// or block was not provided, returns None. + /// Creates `FlatState` to access state for `shard_id` and block `block_hash`. Note that + /// the state includes changes by the block `block_hash`. + /// `block_hash`: only create FlatState if it is not None. This is a hack we have temporarily + /// to not introduce too many changes in the trie interface. + /// `is_view`: whether this flat state is used for view client. We use a separate set of caches + /// for flat state for client vs view client. For now, we don't support flat state + /// for view client, so we simply return None if `is_view` is True. + /// TODO (#7327): take block_hash as CryptoHash instead of Option + /// TODO (#7327): implement support for view_client pub fn new_flat_state_for_shard( &self, shard_id: ShardId, @@ -496,16 +503,17 @@ pub trait ChainAccessForFlatStorage { #[cfg(feature = "protocol_feature_flat_state")] impl FlatStorageStateInner { + /// Get deltas between blocks `target_block_hash`(inclusive) to flat head(inclusive), + /// in backwards chain order. Returns an error if there is no path between these two them. fn get_deltas_between_blocks( &self, target_block_hash: &CryptoHash, ) -> Result>, FlatStorageError> { - let flat_state_head: CryptoHash = self.flat_head; - let flat_head_info = self.blocks.get(&flat_state_head).unwrap(); + let flat_head_info = self.blocks.get(&self.flat_head).unwrap(); let mut block_hash = target_block_hash.clone(); let mut deltas = vec![]; - while block_hash != flat_state_head { + loop { let block_info = self .blocks .get(&block_hash) @@ -518,9 +526,15 @@ impl FlatStorageStateInner { let delta = self .deltas .get(&block_hash) + // panic here because we already checked that the block is in self.blocks, so it + // should be in self.deltas too .unwrap_or_else(|| panic!("block delta for {:?} is not available", block_hash)); deltas.push(delta.clone()); + if block_hash == self.flat_head { + break; + } + block_hash = block_info.prev_hash; } @@ -586,8 +600,8 @@ impl FlatStorageState { }))) } - /// Get deltas for blocks, ordered from `self.block_hash` to flat state head (backwards chain order). - // TODO (#7327): implement garbage collection of old deltas. + /// Get deltas between blocks `target_block_hash`(inclusive) to flat head(inclusive), + /// in backwards chain order. Returns an error if there is no path between these two them. #[cfg(feature = "protocol_feature_flat_state")] fn get_deltas_between_blocks( &self, @@ -608,6 +622,7 @@ impl FlatStorageState { // Update the head of the flat storage, including updating the flat state in memory and on disk // and updating the flat state to reflect the state at the new head + // TODO (#7327): implement garbage collection of old deltas. #[cfg(feature = "protocol_feature_flat_state")] pub fn update_flat_head(&self, new_head: &CryptoHash) -> Result<(), FlatStorageError> { let mut guard = self.0.write().expect(POISONED_LOCK_ERR); @@ -679,9 +694,12 @@ impl FlatStorageState { #[cfg(test)] #[cfg(feature = "protocol_feature_flat_state")] mod tests { - use crate::flat_state::{store_helper, BlockInfo, ChainAccessForFlatStorage, FlatStorageState}; + use crate::flat_state::{ + store_helper, BlockInfo, ChainAccessForFlatStorage, FlatStateFactory, FlatStorageState, + }; use crate::test_utils::create_test_store; use crate::FlatStateDelta; + use crate::StorageError; use borsh::BorshSerialize; use near_primitives::borsh::maybestd::collections::HashSet; use near_primitives::hash::{hash, CryptoHash}; @@ -690,6 +708,8 @@ mod tests { use near_primitives::types::{ BlockHeight, RawStateChange, RawStateChangesWithTrieKey, StateChangeCause, }; + + use assert_matches::assert_matches; use std::collections::HashMap; struct MockChain { @@ -850,8 +870,14 @@ mod tests { assert_eq!(delta.get(&[5]), Some(Some(ValueRef::new(&[9])))); } + // This test tests some basic use cases for FlatState and FlatStorageState. + // We created a linear chain with no forks, start with flat head at the genesis block, then + // moves the flat head forward, which checking that flat_state.get_ref() still returns the correct + // values and the state is being updated in store. #[test] fn flat_storage_state_sanity() { + // 1. Create a chain with 10 blocks with no forks. Set flat head to be at block 0. + // Block i sets value for key &[1] to &[i]. let mut chain = MockChain::linear_chain(10); let store = create_test_store(); let mut store_update = store.store_update(); @@ -868,12 +894,22 @@ mod tests { store_update.commit().unwrap(); let flat_storage_state = FlatStorageState::new(store.clone(), 0, 9, &chain); - // TODO: create a flat state and test the flat state + let flat_state_factory = FlatStateFactory::new(store.clone()); + flat_state_factory.add_flat_storage_state_for_shard(0, flat_storage_state); + let flat_storage_state = flat_state_factory.get_flat_storage_state_for_shard(0).unwrap(); + + // 2. Check that the flat_state at block i reads the value of key &[1] as &[i] for i in 0..10 { - let deltas = - flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(i)).unwrap(); - assert_eq!(deltas.len(), i as usize); + let block_hash = chain.get_block_hash(i); + let deltas = flat_storage_state.get_deltas_between_blocks(&block_hash).unwrap(); + assert_eq!(deltas.len(), i as usize + 1); + let flat_state = + flat_state_factory.new_flat_state_for_shard(0, Some(block_hash), false).unwrap(); + assert_eq!(flat_state.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[i as u8]))); } + + // 3. Create a new block that deletes &[1] and add a new value &[2] + // Add the block to flat storage. let hash = chain.create_block(); let store_update = flat_storage_state .add_block( @@ -883,18 +919,41 @@ mod tests { ) .unwrap(); store_update.commit().unwrap(); + + // 4. Create a flat_state0 at block 10 and flat_state1 at block 4 + // Verify that they return the correct values let deltas = flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(10)).unwrap(); - assert_eq!(deltas.len(), 10); + assert_eq!(deltas.len(), 11); + let flat_state0 = flat_state_factory + .new_flat_state_for_shard(0, Some(chain.get_block_hash(10)), false) + .unwrap(); + let flat_state1 = flat_state_factory + .new_flat_state_for_shard(0, Some(chain.get_block_hash(4)), false) + .unwrap(); + assert_eq!(flat_state0.get_ref(&[1]).unwrap(), None); + assert_eq!(flat_state0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1]))); + assert_eq!(flat_state1.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[4]))); + assert_eq!(flat_state1.get_ref(&[2]).unwrap(), None); + + // 5. Move the flat head to block 5, verify that flat_state0 still returns the same values + // and flat_state1 returns an error. Also check that DBCol::FlatState is updated correctly flat_storage_state.update_flat_head(&chain.get_block_hash(5)).unwrap(); assert_eq!(store_helper::get_state(&store, &[1]).unwrap(), Some(ValueRef::new(&[5]))); - let deltas = flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(10)).unwrap(); - assert_eq!(deltas.len(), 5); + assert_eq!(deltas.len(), 6); + assert_eq!(flat_state0.get_ref(&[1]).unwrap(), None); + assert_eq!(flat_state0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1]))); + assert_matches!(flat_state1.get_ref(&[1]), Err(StorageError::FlatStorageError(_))); + + // 6. Move the flat head to block 10, verify that flat_state0 still returns the same values + // Also checks that DBCol::FlatState is updated correctly. flat_storage_state.update_flat_head(&chain.get_block_hash(10)).unwrap(); assert_eq!(store_helper::get_state(&store, &[1]).unwrap(), None); assert_eq!(store_helper::get_state(&store, &[2]).unwrap(), Some(ValueRef::new(&[1]))); + assert_eq!(flat_state0.get_ref(&[1]).unwrap(), None); + assert_eq!(flat_state0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1]))); } #[test] diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index 9fcc22703dd..30627f4e30a 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -876,7 +876,7 @@ impl Trie { pub fn get_ref(&self, key: &[u8]) -> Result, StorageError> { let is_delayed = is_delayed_receipt_key(key); match &self.flat_state { - Some(flat_state) if !is_delayed => flat_state.get_ref(&key).map_err(|e| e.into()), + Some(flat_state) if !is_delayed => flat_state.get_ref(&key), _ => { let key = NibbleSlice::new(key); self.lookup(key) diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index 7bf7381cf9c..971e16806cd 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -2869,6 +2869,8 @@ fn test_fork_receipt_ids() { #[test] fn test_fork_execution_outcome() { + init_test_logger(); + let (mut env, tx_hash) = prepare_env_with_transaction(); let mut last_height = 0; From c067ff5db730974092a0033b3da0e62056454ad9 Mon Sep 17 00:00:00 2001 From: Min Zhang Date: Mon, 26 Sep 2022 17:26:24 -0400 Subject: [PATCH 5/7] address comments --- chain/chain/src/chain.rs | 17 ++------ core/store/src/flat_state.rs | 81 ++++++++++++++---------------------- 2 files changed, 36 insertions(+), 62 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index fbbd2bed35a..ee25ebc89e9 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -49,7 +49,7 @@ use near_primitives::views::{ SignedTransactionView, }; #[cfg(feature = "protocol_feature_flat_state")] -use near_store::flat_state; +use near_store::{flat_state, StorageError}; use near_store::{DBCol, ShardTries, StoreUpdate, WrappedTrieChanges}; use crate::block_processing_utils::{ @@ -621,15 +621,6 @@ impl Chain { shard_id, &block_head.last_block_hash, ); - // The genesis block doesn't include any transactions or receipts, so the - // block delta is empty - flat_state::store_helper::set_delta( - &mut tmp_store_update, - shard_id, - block_head.last_block_hash, - &FlatStateDelta::default(), - ) - .map_err(|e| StorageError::from(e))?; } store_update.merge(tmp_store_update); } @@ -648,7 +639,7 @@ impl Chain { let flat_storage_state = FlatStorageState::new( store.store().clone(), shard_id, - store.head().unwrap().height, + store.head()?.height, &store, ); runtime_adapter.add_flat_storage_state_for_shard(shard_id, flat_storage_state); @@ -4674,6 +4665,8 @@ impl<'a> ChainUpdate<'a> { shard_id: ShardId, trie_changes: &WrappedTrieChanges, ) -> Result<(), Error> { + // Right now, we don't implement flat storage for catchup, so we only store + // the delta for the shards that we are tracking this epoch #[cfg(feature = "protocol_feature_flat_state")] if self.runtime_adapter.cares_about_shard(me.as_ref(), &prev_hash, shard_id, true) { if let Some(chain_flat_storage) = @@ -4723,8 +4716,6 @@ impl<'a> ChainUpdate<'a> { apply_result.total_balance_burnt, ), ); - // Right now, we don't implement flat storage for catchup, so we only store - // the delta if we are not catching up self.save_flat_state_changes( me, block_hash, diff --git a/core/store/src/flat_state.rs b/core/store/src/flat_state.rs index 9427b53d37d..ba79ec469d9 100644 --- a/core/store/src/flat_state.rs +++ b/core/store/src/flat_state.rs @@ -114,7 +114,7 @@ mod imp { }; } - Ok(store_helper::get_state(&self.store, key)?) + Ok(store_helper::get_ref(&self.store, key)?) } } @@ -351,7 +351,7 @@ impl FlatStateDelta { #[cfg(feature = "protocol_feature_flat_state")] pub fn apply_to_flat_state(self, store_update: &mut StoreUpdate) { for (key, value) in self.0.into_iter() { - store_helper::set_state(store_update, key, value).expect(BORSH_ERR); + store_helper::set_ref(store_update, key, value).expect(BORSH_ERR); } } @@ -388,8 +388,8 @@ pub struct BlockInfo { // - `flat_head` is stored on disk. The value of flat_head in memory and on disk should always // be consistent with the flat state stored in `DbCol::FlatState` on disk. This means, updates to // these values much be atomic from the outside. -// - `blocks` and `deltas` store the same set of blocks, which is the set of blocks that -// `FlatStorageState::get_deltas_between_blocks` supports. For any block in `blocks`, `flat_head` +// - `blocks` and `deltas` store the same set of blocks, except that `flat_head` is in `blocks`, +// but not in `deltas`. For any block in `blocks`, `flat_head` // must be on the same chain as the block and all blocks between `flat_head` and the block must // also be in `blocks`. // - All deltas in `deltas` are stored on disk. And if a block is accepted by chain, its deltas @@ -402,8 +402,8 @@ struct FlatStorageStateInner { /// Id of the shard which state is accessed by this flat storage. #[allow(unused)] shard_id: ShardId, - /// The block for which we store the key value pairs of its state. For non catchup mode, - /// it should be the last final block. + /// The block for which we store the key value pairs of the state after it is applied. + /// For non catchup mode, it should be the last final block. #[allow(unused)] flat_head: CryptoHash, /// Stores some information for all blocks supported by flat storage, this is used for finding @@ -463,10 +463,7 @@ pub mod store_helper { .expect("Error writing flat head from storage") } - pub(crate) fn get_state( - store: &Store, - key: &[u8], - ) -> Result, FlatStorageError> { + pub(crate) fn get_ref(store: &Store, key: &[u8]) -> Result, FlatStorageError> { let raw_ref = store .get(crate::DBCol::FlatState, key) .map_err(|_| FlatStorageError::StorageInternalError); @@ -478,7 +475,7 @@ pub mod store_helper { } } - pub(crate) fn set_state( + pub(crate) fn set_ref( store_update: &mut StoreUpdate, key: Vec, value: Option, @@ -503,7 +500,7 @@ pub trait ChainAccessForFlatStorage { #[cfg(feature = "protocol_feature_flat_state")] impl FlatStorageStateInner { - /// Get deltas between blocks `target_block_hash`(inclusive) to flat head(inclusive), + /// Get deltas between blocks `target_block_hash`(inclusive) to flat head(exclusive), /// in backwards chain order. Returns an error if there is no path between these two them. fn get_deltas_between_blocks( &self, @@ -513,7 +510,7 @@ impl FlatStorageStateInner { let mut block_hash = target_block_hash.clone(); let mut deltas = vec![]; - loop { + while block_hash != self.flat_head { let block_info = self .blocks .get(&block_hash) @@ -531,10 +528,6 @@ impl FlatStorageStateInner { .unwrap_or_else(|| panic!("block delta for {:?} is not available", block_hash)); deltas.push(delta.clone()); - if block_hash == self.flat_head { - break; - } - block_hash = block_info.prev_hash; } @@ -580,16 +573,19 @@ impl FlatStorageState { block_info.height ); blocks.insert(hash, block_info); + deltas.insert( + hash, + store_helper::get_delta(&store, shard_id, hash) + .expect(BORSH_ERR) + .unwrap_or_else(|| { + panic!( + "Cannot find block delta for block {:?} shard {}", + hash, shard_id + ) + }), + ); } } - for hash in blocks.keys() { - deltas.insert( - *hash, - store_helper::get_delta(&store, shard_id, *hash).expect(BORSH_ERR).unwrap_or_else( - || panic!("Cannot find block delta for block {:?} shard {}", hash, shard_id), - ), - ); - } Self(Arc::new(RwLock::new(FlatStorageStateInner { store, @@ -882,7 +878,8 @@ mod tests { let store = create_test_store(); let mut store_update = store.store_update(); store_helper::set_flat_head(&mut store_update, 0, &chain.get_block_hash(0)); - for i in 0..10 { + store_helper::set_ref(&mut store_update, vec![1], Some(ValueRef::new(&[0]))).unwrap(); + for i in 1..10 { store_helper::set_delta( &mut store_update, 0, @@ -902,7 +899,7 @@ mod tests { for i in 0..10 { let block_hash = chain.get_block_hash(i); let deltas = flat_storage_state.get_deltas_between_blocks(&block_hash).unwrap(); - assert_eq!(deltas.len(), i as usize + 1); + assert_eq!(deltas.len(), i as usize); let flat_state = flat_state_factory.new_flat_state_for_shard(0, Some(block_hash), false).unwrap(); assert_eq!(flat_state.get_ref(&[1]).unwrap(), Some(ValueRef::new(&[i as u8]))); @@ -924,7 +921,7 @@ mod tests { // Verify that they return the correct values let deltas = flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(10)).unwrap(); - assert_eq!(deltas.len(), 11); + assert_eq!(deltas.len(), 10); let flat_state0 = flat_state_factory .new_flat_state_for_shard(0, Some(chain.get_block_hash(10)), false) .unwrap(); @@ -939,10 +936,10 @@ mod tests { // 5. Move the flat head to block 5, verify that flat_state0 still returns the same values // and flat_state1 returns an error. Also check that DBCol::FlatState is updated correctly flat_storage_state.update_flat_head(&chain.get_block_hash(5)).unwrap(); - assert_eq!(store_helper::get_state(&store, &[1]).unwrap(), Some(ValueRef::new(&[5]))); + assert_eq!(store_helper::get_ref(&store, &[1]).unwrap(), Some(ValueRef::new(&[5]))); let deltas = flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(10)).unwrap(); - assert_eq!(deltas.len(), 6); + assert_eq!(deltas.len(), 5); assert_eq!(flat_state0.get_ref(&[1]).unwrap(), None); assert_eq!(flat_state0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1]))); assert_matches!(flat_state1.get_ref(&[1]), Err(StorageError::FlatStorageError(_))); @@ -950,26 +947,12 @@ mod tests { // 6. Move the flat head to block 10, verify that flat_state0 still returns the same values // Also checks that DBCol::FlatState is updated correctly. flat_storage_state.update_flat_head(&chain.get_block_hash(10)).unwrap(); - assert_eq!(store_helper::get_state(&store, &[1]).unwrap(), None); - assert_eq!(store_helper::get_state(&store, &[2]).unwrap(), Some(ValueRef::new(&[1]))); + let deltas = + flat_storage_state.get_deltas_between_blocks(&chain.get_block_hash(10)).unwrap(); + assert_eq!(deltas.len(), 0); + assert_eq!(store_helper::get_ref(&store, &[1]).unwrap(), None); + assert_eq!(store_helper::get_ref(&store, &[2]).unwrap(), Some(ValueRef::new(&[1]))); assert_eq!(flat_state0.get_ref(&[1]).unwrap(), None); assert_eq!(flat_state0.get_ref(&[2]).unwrap(), Some(ValueRef::new(&[1]))); } - - #[test] - fn flat_state_apply_single_delta() { - // TODO (#7327): check this scenario after implementing flat storage state: - // 1) create FlatState for one shard with FlatStorage - // 2) create FlatStateDelta and apply it - // 3) check that FlatStorageState contains the right values - } - - #[test] - fn flat_state_apply_delta_range() { - // TODO (#7327): check this scenario after implementing flat storage state: - // 1) add tree of blocks and FlatStateDeltas for them - // 2) call `get_deltas_between_blocks` and check its correctness - // 3) apply deltas and check that FlatStorageState contains the right values; - // e.g. for the same key only the latest value is applied - } } From fc595774ed56dc9323e4955274fb487bbb2fc87e Mon Sep 17 00:00:00 2001 From: Min Zhang Date: Mon, 26 Sep 2022 18:08:22 -0400 Subject: [PATCH 6/7] make set_flat_head private --- chain/chain/src/chain.rs | 23 ++++++++--------------- chain/chain/src/test_utils.rs | 8 ++++++++ chain/chain/src/types.rs | 6 ++++++ core/store/src/flat_state.rs | 34 ++++++++++++++++++++++++++++++++-- nearcore/src/runtime/mod.rs | 16 ++++++++++++++++ 5 files changed, 70 insertions(+), 17 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index ee25ebc89e9..269bffc069d 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -609,21 +609,14 @@ impl Chain { store_update.save_head(&block_head)?; store_update.save_final_head(&header_head)?; - #[cfg(feature = "protocol_feature_flat_state")] - { - let mut tmp_store_update = store_update.store().store_update(); - // Set the root block of flat state to be the genesis block. Later, when we - // init FlatStorageStates, we will read the from this column in storage, so it - // must be set here. - for shard_id in 0..runtime_adapter.num_shards(&block_head.epoch_id)? { - flat_state::store_helper::set_flat_head( - &mut tmp_store_update, - shard_id, - &block_head.last_block_hash, - ); - } - store_update.merge(tmp_store_update); - } + // Set the root block of flat state to be the genesis block. Later, when we + // init FlatStorageStates, we will read the from this column in storage, so it + // must be set here. + let tmp_store_update = runtime_adapter.set_flat_storage_state_for_genesis( + genesis.hash(), + genesis.header().epoch_id(), + )?; + store_update.merge(tmp_store_update); info!(target: "chain", "Init: saved genesis: #{} {} / {:?}", block_head.height, block_head.last_block_hash, state_roots); diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index 9127fa9bb80..5287519791e 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -701,6 +701,14 @@ impl RuntimeAdapter for KeyValueRuntime { ) { } + fn set_flat_storage_state_for_genesis( + &self, + _genesis_block: &CryptoHash, + _genesis_epoch_id: &EpochId, + ) -> Result { + Ok(self.store.store_update()) + } + fn get_prev_shard_ids( &self, _prev_hash: &CryptoHash, diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 7f448ea8ff9..c49dc5c5fdf 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -292,6 +292,12 @@ pub trait RuntimeAdapter: EpochManagerAdapter + Send + Sync { flat_storage_state: FlatStorageState, ); + fn set_flat_storage_state_for_genesis( + &self, + genesis_block: &CryptoHash, + genesis_epoch_id: &EpochId, + ) -> Result; + /// Validates a given signed transaction. /// If the state root is given, then the verification will use the account. Otherwise it will /// only validate the transaction math, limits and signatures. diff --git a/core/store/src/flat_state.rs b/core/store/src/flat_state.rs index ba79ec469d9..2a980d90ea0 100644 --- a/core/store/src/flat_state.rs +++ b/core/store/src/flat_state.rs @@ -58,7 +58,7 @@ mod imp { use std::collections::HashMap; use std::sync::{Arc, Mutex}; - use crate::Store; + use crate::{Store, StoreUpdate}; /// Struct for getting value references from the flat storage. /// @@ -147,6 +147,32 @@ mod imp { })) } + /// When a node starts from an empty database, this function must be called to ensure + /// information such as flat head is set up correctly in the database. + /// Note that this function is different from `add_flat_storage_state_for_shard`, + /// it must be called before `add_flat_storage_state_for_shard` if the node starts from + /// an empty database. + #[cfg(feature = "protocol_feature_flat_state")] + pub fn set_flat_storage_state_for_genesis( + &self, + store_update: &mut StoreUpdate, + shard_id: ShardId, + genesis_block: &CryptoHash, + ) { + let flat_storage_states = self.0.flat_storage_states.lock().expect(POISONED_LOCK_ERR); + assert!(!flat_storage_states.contains_key(&shard_id)); + store_helper::set_flat_head(store_update, shard_id, genesis_block); + } + + #[cfg(not(feature = "protocol_feature_flat_state"))] + pub fn set_flat_storage_state_for_genesis( + &self, + _store_update: &mut StoreUpdate, + _shard_id: ShardId, + _genesis_block: &CryptoHash, + ) { + } + /// Add a flat storage state for shard `shard_id`. The function also checks that /// the shard's flat storage state hasn't been set before, otherwise it panics. /// TODO (#7327): this behavior may change when we implement support for state sync @@ -457,7 +483,11 @@ pub mod store_helper { .unwrap_or_else(|| panic!("Cannot read flat head for shard {} from storage", shard_id)) } - pub fn set_flat_head(store_update: &mut StoreUpdate, shard_id: ShardId, val: &CryptoHash) { + pub(crate) fn set_flat_head( + store_update: &mut StoreUpdate, + shard_id: ShardId, + val: &CryptoHash, + ) { store_update .set_ser(crate::DBCol::FlatStateMisc, &shard_id.try_to_vec().unwrap(), val) .expect("Error writing flat head from storage") diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 37199c18bff..abffdc906a8 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -708,6 +708,22 @@ impl RuntimeAdapter for NightshadeRuntime { self.flat_state_factory.add_flat_storage_state_for_shard(shard_id, flat_storage_state) } + fn set_flat_storage_state_for_genesis( + &self, + genesis_block: &CryptoHash, + genesis_epoch_id: &EpochId, + ) -> Result { + let mut store_update = self.store.store_update(); + for shard_id in 0..self.num_shards(genesis_epoch_id)? { + self.flat_state_factory.set_flat_storage_state_for_genesis( + &mut store_update, + shard_id, + genesis_block, + ); + } + Ok(store_update) + } + fn validate_tx( &self, gas_price: Balance, From 73465f09cdc74fb0fa88d5aca68b2433f0a7ee02 Mon Sep 17 00:00:00 2001 From: Min Zhang Date: Mon, 26 Sep 2022 18:19:04 -0400 Subject: [PATCH 7/7] fix compilcation --- core/store/src/flat_state.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/store/src/flat_state.rs b/core/store/src/flat_state.rs index 2a980d90ea0..d24dabe850c 100644 --- a/core/store/src/flat_state.rs +++ b/core/store/src/flat_state.rs @@ -164,15 +164,6 @@ mod imp { store_helper::set_flat_head(store_update, shard_id, genesis_block); } - #[cfg(not(feature = "protocol_feature_flat_state"))] - pub fn set_flat_storage_state_for_genesis( - &self, - _store_update: &mut StoreUpdate, - _shard_id: ShardId, - _genesis_block: &CryptoHash, - ) { - } - /// Add a flat storage state for shard `shard_id`. The function also checks that /// the shard's flat storage state hasn't been set before, otherwise it panics. /// TODO (#7327): this behavior may change when we implement support for state sync @@ -260,7 +251,7 @@ mod imp { #[cfg(not(feature = "protocol_feature_flat_state"))] mod imp { use crate::flat_state::FlatStorageState; - use crate::Store; + use crate::{Store, StoreUpdate}; use near_primitives::hash::CryptoHash; use near_primitives::types::ShardId; @@ -306,6 +297,14 @@ mod imp { _flat_storage_state: FlatStorageState, ) { } + + pub fn set_flat_storage_state_for_genesis( + &self, + _store_update: &mut StoreUpdate, + _shard_id: ShardId, + _genesis_block: &CryptoHash, + ) { + } } }