Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FlatStorageState #7663

Merged
merged 9 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 108 additions & 75 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ use near_primitives::views::{
SignedTransactionView,
};
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state;
use near_store::{DBCol, ShardTries, StoreUpdate};
use near_store::{flat_state, StorageError};
use near_store::{DBCol, ShardTries, StoreUpdate, WrappedTrieChanges};

use crate::block_processing_utils::{
BlockPreprocessInfo, BlockProcessingArtifact, BlocksInProcessing, DoneApplyChunkCallback,
Expand Down Expand Up @@ -80,9 +80,8 @@ use near_primitives::shard_layout::{
account_id_to_shard_id, account_id_to_shard_uid, ShardLayout, ShardUId,
};
use near_primitives::version::PROTOCOL_VERSION;
use near_store::flat_state::FlatStateDelta;
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state::FlatStorageState;
use near_store::flat_state::{FlatStateDelta, FlatStorageState};
use once_cell::sync::OnceCell;
use rayon::iter::{IntoParallelIterator, ParallelIterator};

Expand Down Expand Up @@ -610,29 +609,14 @@ impl Chain {
store_update.save_head(&block_head)?;
store_update.save_final_head(&header_head)?;

#[cfg(feature = "protocol_feature_flat_state")]
{
let mut tmp_store_update = store_update.store().store_update();
// Set the root block of flat state to be the genesis block. Later, when we
// init FlatStorageStates, we will read the from this column in storage, so it
// must be set here.
for shard_id in 0..runtime_adapter.num_shards(&block_head.epoch_id)? {
flat_state::store_helper::set_flat_head(
&mut tmp_store_update,
shard_id,
&block_head.last_block_hash,
);
// The genesis block doesn't include any transactions or receipts, so the
// block delta is empty
flat_state::store_helper::set_delta(
&mut tmp_store_update,
shard_id,
block_head.last_block_hash,
&FlatStateDelta::new(),
)?;
}
store_update.merge(tmp_store_update);
}
// Set the root block of flat state to be the genesis block. Later, when we
// init FlatStorageStates, we will read the from this column in storage, so it
// must be set here.
let tmp_store_update = runtime_adapter.set_flat_storage_state_for_genesis(
genesis.hash(),
genesis.header().epoch_id(),
)?;
store_update.merge(tmp_store_update);

info!(target: "chain", "Init: saved genesis: #{} {} / {:?}", block_head.height, block_head.last_block_hash, state_roots);

Expand All @@ -647,18 +631,9 @@ impl Chain {
for shard_id in 0..runtime_adapter.num_shards(&block_head.epoch_id)? {
let flat_storage_state = FlatStorageState::new(
store.store().clone(),
store.head()?.height,
&|hash| store.get_block_header(hash).unwrap(),
&|height| {
store
.get_all_block_hashes_by_height(height)
.unwrap()
.values()
.flatten()
.copied()
.collect::<HashSet<_>>()
},
shard_id,
store.head()?.height,
&store,
);
runtime_adapter.add_flat_storage_state_for_shard(shard_id, flat_storage_state);
}
Expand Down Expand Up @@ -2085,6 +2060,44 @@ impl Chain {
chain_update.postprocess_block(me, &block, block_preprocess_info, apply_results)?;
chain_update.commit()?;

// Update flat storage head to be the last final block. Note that this update happens
// in a separate db transaction from the update from block processing. This is intentional
// because flat_storage_state need to be locked during the update of flat head, otherwise
// flat_storage_state is in an inconsistent state that could be accessed by the other
// apply chunks processes. This means, the flat head is not always the same as
// the last final block on chain, which is OK, because in the flat storage implementation
// we don't assume that.
// Also note, for now, we only update flat storage for the shards that we care about in this epoch
// TODO (#7327): support flat storage for state sync and block catchups
for shard_id in 0..self.runtime_adapter.num_shards(block.header().epoch_id())? {
if self.runtime_adapter.cares_about_shard(
me.as_ref(),
block.header().prev_hash(),
shard_id,
true,
) {
if let Some(flat_storage_state) =
self.runtime_adapter.get_flat_storage_state_for_shard(shard_id)
{
let mut new_flat_head = *block.header().last_final_block();
if new_flat_head == CryptoHash::default() {
new_flat_head = *self.genesis.hash();
}
flat_storage_state.update_flat_head(&new_flat_head).unwrap_or_else(|_| {
panic!(
"Cannot update flat head from {:?} to {:?}",
flat_storage_state.get_flat_head(),
new_flat_head
)
});
} else {
// TODO (#7327): some error handling code here. Should probably return an error (or panic?)
// here if the flat storage doesn't exist. We don't do that yet because
// flat storage is not fully enabled yet.
}
}
}

self.pending_state_patch.clear();

if let Some(tip) = &new_head {
Expand Down Expand Up @@ -3102,7 +3115,7 @@ impl Chain {
if self.blocks_in_processing.has_blocks_to_catch_up(&queued_block) {
processed_blocks.insert(queued_block, results);
} else {
match self.block_catch_up_postprocess(&queued_block, results) {
match self.block_catch_up_postprocess(me, &queued_block, results) {
Ok(_) => {
let mut saw_one = false;
for next_block_hash in
Expand Down Expand Up @@ -3155,13 +3168,14 @@ impl Chain {

fn block_catch_up_postprocess(
&mut self,
me: &Option<AccountId>,
block_hash: &CryptoHash,
results: Vec<Result<ApplyChunkResult, Error>>,
) -> Result<(), Error> {
let block = self.store.get_block(block_hash)?;
let prev_block = self.store.get_block(block.header().prev_hash())?;
let mut chain_update = self.chain_update();
chain_update.apply_chunk_postprocessing(&block, &prev_block, results, true)?;
chain_update.apply_chunk_postprocessing(me, &block, &prev_block, results)?;
chain_update.commit()?;
Ok(())
}
Expand Down Expand Up @@ -3603,7 +3617,10 @@ impl Chain {
&transaction.transaction.block_hash,
transaction_validity_period,
)
.map_err(|_| Error::from(Error::InvalidTransactions))?;
.map_err(|_| {
tracing::warn!("Invalid Transactions for mock node");
Error::from(Error::InvalidTransactions)
})?;
}
};

Expand Down Expand Up @@ -4487,18 +4504,19 @@ impl<'a> ChainUpdate<'a> {

fn apply_chunk_postprocessing(
&mut self,
me: &Option<AccountId>,
block: &Block,
prev_block: &Block,
apply_results: Vec<Result<ApplyChunkResult, Error>>,
is_catching_up: bool,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "chain", "apply_chunk_postprocessing").entered();
for result in apply_results {
self.process_apply_chunk_result(
me,
result?,
*block.hash(),
block.header().height(),
*prev_block.hash(),
is_catching_up,
)?
}
Ok(())
Expand Down Expand Up @@ -4630,13 +4648,42 @@ impl<'a> ChainUpdate<'a> {
Ok(())
}

#[allow(unused_variables)]
fn save_flat_state_changes(
&mut self,
me: &Option<AccountId>,
block_hash: CryptoHash,
prev_hash: CryptoHash,
height: BlockHeight,
shard_id: ShardId,
trie_changes: &WrappedTrieChanges,
) -> Result<(), Error> {
// Right now, we don't implement flat storage for catchup, so we only store
// the delta for the shards that we are tracking this epoch
#[cfg(feature = "protocol_feature_flat_state")]
if self.runtime_adapter.cares_about_shard(me.as_ref(), &prev_hash, shard_id, true) {
if let Some(chain_flat_storage) =
self.runtime_adapter.get_flat_storage_state_for_shard(shard_id)
{
let delta = FlatStateDelta::from_state_changes(&trie_changes.state_changes());
let block_info = flat_state::BlockInfo { hash: block_hash, height, prev_hash };
let store_update = chain_flat_storage
.add_block(&block_hash, delta, block_info)
.map_err(|e| StorageError::from(e))?;
self.chain_store_update.merge(store_update);
}
}
Ok(())
}

/// Processed results of applying chunk
fn process_apply_chunk_result(
&mut self,
me: &Option<AccountId>,
result: ApplyChunkResult,
block_hash: CryptoHash,
height: BlockHeight,
prev_block_hash: CryptoHash,
is_catching_up: bool,
) -> Result<(), Error> {
match result {
ApplyChunkResult::SameHeight(SameHeightResult {
Expand All @@ -4662,19 +4709,14 @@ impl<'a> ChainUpdate<'a> {
apply_result.total_balance_burnt,
),
);
// Right now, we don't implement flat storage for catchup, so we only store
// the delta if we are not catching up
if !is_catching_up {
if let Some(chain_flat_storage) =
self.runtime_adapter.get_flat_storage_state_for_shard(shard_id)
{
let delta = FlatStateDelta::from_state_changes(
&apply_result.trie_changes.state_changes(),
);
let store_update = chain_flat_storage.add_delta(&block_hash, delta)?;
self.chain_store_update.merge(store_update);
}
}
self.save_flat_state_changes(
me,
block_hash,
prev_block_hash,
height,
shard_id,
&apply_result.trie_changes,
)?;
self.chain_store_update.save_trie_changes(apply_result.trie_changes);
self.chain_store_update.save_outgoing_receipt(
&block_hash,
Expand Down Expand Up @@ -4708,6 +4750,14 @@ impl<'a> ChainUpdate<'a> {
let mut new_extra = ChunkExtra::clone(&old_extra);
*new_extra.state_root_mut() = apply_result.new_root;

self.save_flat_state_changes(
me,
block_hash,
prev_block_hash,
height,
shard_uid.shard_id(),
&apply_result.trie_changes,
)?;
self.chain_store_update.save_chunk_extra(&block_hash, &shard_uid, new_extra);
self.chain_store_update.save_trie_changes(apply_result.trie_changes);

Expand Down Expand Up @@ -4747,7 +4797,7 @@ impl<'a> ChainUpdate<'a> {
) -> Result<Option<Tip>, Error> {
let prev_hash = block.header().prev_hash();
let prev_block = self.chain_store_update.get_block(prev_hash)?;
self.apply_chunk_postprocessing(block, &prev_block, apply_chunks_results, false)?;
self.apply_chunk_postprocessing(me, block, &prev_block, apply_chunks_results)?;

let BlockPreprocessInfo {
is_caught_up,
Expand Down Expand Up @@ -4830,23 +4880,6 @@ impl<'a> ChainUpdate<'a> {
}
}
}

// TODO: fill in the logic here for which shards we need to update flat storage for
// only need shards that we have processed in this block (not the shards to catch up)
let flat_storage_shards = vec![];
for shard_id in flat_storage_shards {
if let Some(_flat_storage_state) =
self.runtime_adapter.get_flat_storage_state_for_shard(shard_id)
{
// TODO: fill in the correct new head
// if let Some(_update) = flat_storage_state.update_head(&CryptoHash::default()) {
// TODO: add this update to chain update
// }
} else {
// TODO: some error handling code here. Should probably return an error (or panic?)
// here if the flat storage doesn't exist.
}
}
Ok(res)
}

Expand Down
19 changes: 19 additions & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use near_store::{
use crate::types::{Block, BlockHeader, LatestKnown};
use crate::{byzantine_assert, RuntimeAdapter};
use near_store::db::StoreStatistics;
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state::{BlockInfo, ChainAccessForFlatStorage};
use std::sync::Arc;

/// lru cache size
Expand Down Expand Up @@ -1118,6 +1120,23 @@ impl ChainStoreAccess for ChainStore {
}
}

#[cfg(feature = "protocol_feature_flat_state")]
impl ChainAccessForFlatStorage for ChainStore {
fn get_block_info(&self, block_hash: &CryptoHash) -> BlockInfo {
let header = self.get_block_header(block_hash).unwrap();
BlockInfo { hash: *block_hash, height: header.height(), prev_hash: *header.prev_hash() }
}

fn get_block_hashes_at_height(&self, height: BlockHeight) -> HashSet<CryptoHash> {
self.get_all_block_hashes_by_height(height)
.unwrap()
.values()
.flatten()
.copied()
.collect::<HashSet<_>>()
}
}

/// Cache update for ChainStore
#[derive(Default)]
struct ChainStoreCacheUpdate {
Expand Down
8 changes: 8 additions & 0 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,14 @@ impl RuntimeAdapter for KeyValueRuntime {
) {
}

fn set_flat_storage_state_for_genesis(
&self,
_genesis_block: &CryptoHash,
_genesis_epoch_id: &EpochId,
) -> Result<StoreUpdate, Error> {
Ok(self.store.store_update())
}

fn get_prev_shard_ids(
&self,
_prev_hash: &CryptoHash,
Expand Down
6 changes: 6 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ pub trait RuntimeAdapter: EpochManagerAdapter + Send + Sync {
flat_storage_state: FlatStorageState,
);

fn set_flat_storage_state_for_genesis(
&self,
genesis_block: &CryptoHash,
genesis_epoch_id: &EpochId,
) -> Result<StoreUpdate, Error>;

/// Validates a given signed transaction.
/// If the state root is given, then the verification will use the account. Otherwise it will
/// only validate the transaction math, limits and signatures.
Expand Down
2 changes: 2 additions & 0 deletions core/primitives/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ pub enum StorageError {
/// panic in every place that produces this error.
/// We can check if db is corrupted by verifying everything in the state trie.
StorageInconsistentState(String),
/// Error from flat storage
FlatStorageError(String),
}

impl std::fmt::Display for StorageError {
Expand Down
Loading