Skip to content

Commit

Permalink
Remove DBCol::ChunkPerHeightShard
Browse files Browse the repository at this point in the history
  • Loading branch information
robin-near authored and robin-near committed Sep 26, 2022
1 parent 55a3b4d commit 661aaf5
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 117 deletions.
78 changes: 1 addition & 77 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ pub enum GCMode {
StateSync { clear_block_info: bool },
}

fn get_height_shard_id(height: BlockHeight, shard_id: ShardId) -> Vec<u8> {
let mut res = Vec::with_capacity(40);
res.extend_from_slice(&height.to_le_bytes());
res.extend_from_slice(&shard_id.to_le_bytes());
res
}

/// Accesses the chain store. Used to create atomic editable views that can be reverted.
pub trait ChainStoreAccess {
/// Returns underlaying store.
Expand Down Expand Up @@ -175,12 +168,6 @@ pub trait ChainStoreAccess {
) -> Result<Arc<LightClientBlockView>, Error>;
/// Returns a number of references for Block with `block_hash`
fn get_block_refcount(&self, block_hash: &CryptoHash) -> Result<u64, Error>;
/// Check if we saw chunk hash at given height and shard id.
fn get_any_chunk_hash_by_height_shard(
&self,
height: BlockHeight,
shard_id: ShardId,
) -> Result<ChunkHash, Error>;
/// Returns block header from the current chain defined by `sync_hash` for given height if present.
fn get_block_header_on_chain_by_height(
&self,
Expand Down Expand Up @@ -341,8 +328,6 @@ pub struct ChainStore {
height: CellLruCache<Vec<u8>, CryptoHash>,
/// Cache with height to block hash on any chain.
block_hash_per_height: CellLruCache<Vec<u8>, Arc<HashMap<EpochId, HashSet<CryptoHash>>>>,
/// Cache with height and shard_id to any chunk hash.
chunk_hash_per_height_shard: CellLruCache<Vec<u8>, ChunkHash>,
/// Next block hashes for each block on the canonical chain
next_block_hashes: CellLruCache<Vec<u8>, CryptoHash>,
/// Light client blocks corresponding to the last finalized block of each epoch
Expand Down Expand Up @@ -399,7 +384,6 @@ impl ChainStore {
height: CellLruCache::new(CACHE_SIZE),
block_hash_per_height: CellLruCache::new(CACHE_SIZE),
block_refcounts: CellLruCache::new(CACHE_SIZE),
chunk_hash_per_height_shard: CellLruCache::new(CACHE_SIZE),
next_block_hashes: CellLruCache::new(CACHE_SIZE),
epoch_light_client_blocks: CellLruCache::new(CACHE_SIZE),
outgoing_receipts: CellLruCache::new(CACHE_SIZE),
Expand Down Expand Up @@ -993,21 +977,6 @@ impl ChainStoreAccess for ChainStore {
)
}

fn get_any_chunk_hash_by_height_shard(
&self,
height: BlockHeight,
shard_id: ShardId,
) -> Result<ChunkHash, Error> {
option_to_not_found(
self.read_with_cache(
DBCol::ChunkPerHeightShard,
&self.chunk_hash_per_height_shard,
&get_height_shard_id(height, shard_id),
),
format_args!("CHUNK PER HEIGHT AND SHARD ID: {} {}", height, shard_id),
)
}

/// Get outgoing receipts *generated* from shard `shard_id` in block `prev_hash`
/// Note that this function is different from get_outgoing_receipts_for_shard, see comments there
fn get_outgoing_receipts(
Expand Down Expand Up @@ -1147,7 +1116,6 @@ struct ChainStoreCacheUpdate {
chunks: HashMap<ChunkHash, Arc<ShardChunk>>,
partial_chunks: HashMap<ChunkHash, Arc<PartialEncodedChunk>>,
block_hash_per_height: HashMap<BlockHeight, HashMap<EpochId, HashSet<CryptoHash>>>,
chunk_hash_per_height_shard: HashMap<(BlockHeight, ShardId), ChunkHash>,
height_to_hashes: HashMap<BlockHeight, Option<CryptoHash>>,
next_block_hashes: HashMap<CryptoHash, CryptoHash>,
epoch_light_client_blocks: HashMap<CryptoHash, Arc<LightClientBlockView>>,
Expand Down Expand Up @@ -1371,20 +1339,6 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
}
}

fn get_any_chunk_hash_by_height_shard(
&self,
height: BlockHeight,
shard_id: ShardId,
) -> Result<ChunkHash, Error> {
if let Some(chunk_hash) =
self.chain_store_cache_update.chunk_hash_per_height_shard.get(&(height, shard_id))
{
Ok(chunk_hash.clone())
} else {
self.chain_store.get_any_chunk_hash_by_height_shard(height, shard_id)
}
}

fn get_next_block_hash(&self, hash: &CryptoHash) -> Result<CryptoHash, Error> {
if let Some(next_hash) = self.chain_store_cache_update.next_block_hashes.get(hash) {
Ok(*next_hash)
Expand Down Expand Up @@ -1891,17 +1845,6 @@ impl<'a> ChainStoreUpdate<'a> {
self.chain_store_cache_update.invalid_chunks.insert(chunk.chunk_hash(), Arc::new(chunk));
}

pub fn save_chunk_hash(
&mut self,
height: BlockHeight,
shard_id: ShardId,
chunk_hash: ChunkHash,
) {
self.chain_store_cache_update
.chunk_hash_per_height_shard
.insert((height, shard_id), chunk_hash);
}

pub fn save_block_height_processed(&mut self, height: BlockHeight) {
self.chain_store_cache_update.processed_block_heights.insert(height);
}
Expand Down Expand Up @@ -2135,7 +2078,6 @@ impl<'a> ChainStoreUpdate<'a> {
let block_shard_id = get_block_shard_id(&block_hash, shard_id);
self.gc_outgoing_receipts(&block_hash, shard_id);
self.gc_col(DBCol::IncomingReceipts, &block_shard_id);
self.gc_col(DBCol::ChunkPerHeightShard, &block_shard_id);

// For incoming State Parts it's done in chain.clear_downloaded_parts()
// The following code is mostly for outgoing State Parts.
Expand Down Expand Up @@ -2323,10 +2265,6 @@ impl<'a> ChainStoreUpdate<'a> {
store_update.delete(col, key);
self.chain_store.incoming_receipts.pop(key);
}
DBCol::ChunkPerHeightShard => {
store_update.delete(col, key);
self.chain_store.chunk_hash_per_height_shard.pop(key);
}
DBCol::StateHeaders => {
store_update.delete(col, key);
}
Expand Down Expand Up @@ -2439,6 +2377,7 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::EpochStart
| DBCol::EpochValidatorInfo
| DBCol::BlockOrdinal
| DBCol::_ChunkPerHeightShard
| DBCol::_NextBlockWithNewChunk
| DBCol::_LastBlockWithNewChunk
| DBCol::_TransactionRefCount
Expand Down Expand Up @@ -2533,10 +2472,6 @@ impl<'a> ChainStoreUpdate<'a> {
.chain_store_cache_update
.chunks
.insert(chunk_hash.clone(), source_store.get_chunk(&chunk_hash)?.clone());
chain_store_update
.chain_store_cache_update
.chunk_hash_per_height_shard
.insert((height, shard_id), chunk_hash);
chain_store_update.chain_store_cache_update.outgoing_receipts.insert(
(*block_hash, shard_id),
source_store.get_outgoing_receipts(block_hash, shard_id)?.clone(),
Expand Down Expand Up @@ -2652,12 +2587,6 @@ impl<'a> ChainStoreUpdate<'a> {
for (block_hash, block_extra) in self.chain_store_cache_update.block_extras.iter() {
store_update.insert_ser(DBCol::BlockExtra, block_hash.as_ref(), block_extra)?;
}
for ((height, shard_id), chunk_hash) in
self.chain_store_cache_update.chunk_hash_per_height_shard.iter()
{
let key = get_height_shard_id(*height, *shard_id);
store_update.insert_ser(DBCol::ChunkPerHeightShard, &key, chunk_hash)?;
}
let mut chunk_hashes_by_height: HashMap<BlockHeight, HashSet<ChunkHash>> = HashMap::new();
for (chunk_hash, chunk) in self.chain_store_cache_update.chunks.iter() {
if self.chain_store.get_chunk(chunk_hash).is_ok() {
Expand Down Expand Up @@ -2907,7 +2836,6 @@ impl<'a> ChainStoreUpdate<'a> {
chunks,
partial_chunks,
block_hash_per_height,
chunk_hash_per_height_shard,
height_to_hashes,
next_block_hashes,
epoch_light_client_blocks,
Expand Down Expand Up @@ -2949,10 +2877,6 @@ impl<'a> ChainStoreUpdate<'a> {
.block_hash_per_height
.put(index_to_bytes(height).to_vec(), Arc::new(epoch_id_to_hash));
}
for ((height, shard_id), chunk_hash) in chunk_hash_per_height_shard {
let key = get_height_shard_id(height, shard_id);
self.chain_store.chunk_hash_per_height_shard.put(key, chunk_hash);
}
for (height, block_hash) in height_to_hashes {
let bytes = index_to_bytes(height);
if let Some(hash) = block_hash {
Expand Down
17 changes: 17 additions & 0 deletions chain/chunks/src/chunk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ pub struct EncodedChunksCache {
/// A map from a block height to chunk hashes at this height for all chunk stored in the cache
/// This is used to gc chunks that are out of horizon
height_map: HashMap<BlockHeight, HashSet<ChunkHash>>,
/// A map from block height to shard ID to the chunk hash we've received, so we only process
/// one chunk per shard per height.
height_to_shard_to_chunk: HashMap<BlockHeight, HashMap<ShardId, ChunkHash>>,
/// A map from a block hash to a set of incomplete chunks (does not have all parts and receipts yet)
/// whose previous block is the block hash.
incomplete_chunks: HashMap<CryptoHash, HashSet<ChunkHash>>,
Expand Down Expand Up @@ -108,6 +111,7 @@ impl EncodedChunksCache {
largest_seen_height: 0,
encoded_chunks: HashMap::new(),
height_map: HashMap::new(),
height_to_shard_to_chunk: HashMap::new(),
incomplete_chunks: HashMap::new(),
block_hash_to_chunk_headers: HashMap::new(),
}
Expand Down Expand Up @@ -180,6 +184,10 @@ impl EncodedChunksCache {
.entry(chunk_header.height_created())
.or_default()
.insert(chunk_hash.clone());
self.height_to_shard_to_chunk
.entry(chunk_header.height_created())
.or_default()
.insert(chunk_header.shard_id(), chunk_hash.clone());
self.incomplete_chunks
.entry(chunk_header.prev_block_hash().clone())
.or_default()
Expand All @@ -200,6 +208,14 @@ impl EncodedChunksCache {
self.height_within_front_horizon(height) || self.height_within_rear_horizon(height)
}

pub fn get_chunk_hash_by_height_and_shard(
&self,
height: BlockHeight,
shard_id: ShardId,
) -> Option<&ChunkHash> {
self.height_to_shard_to_chunk.get(&height)?.get(&shard_id)
}

/// Add parts and receipts stored in a partial encoded chunk to the corresponding chunk entry,
/// returning the set of part ords that were previously unknown.
pub fn merge_in_partial_encoded_chunk(
Expand Down Expand Up @@ -240,6 +256,7 @@ impl EncodedChunksCache {
}
}
}
self.height_to_shard_to_chunk.remove(&height);
}
}

Expand Down
20 changes: 6 additions & 14 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1701,10 +1701,11 @@ impl ShardsManager {
return Err(Error::ChainError(near_chain::Error::InvalidChunkHeight));
}
// We shouldn't process unrequested chunk if we have seen one with same (height_created + shard_id) but different chunk_hash
if let Ok(hash) = chain_store
.get_any_chunk_hash_by_height_shard(header.height_created(), header.shard_id())
if let Some(hash) = self
.encoded_chunks
.get_chunk_hash_by_height_and_shard(header.height_created(), header.shard_id())
{
if hash != chunk_hash {
if hash != &chunk_hash {
warn!(target: "client", "Rejecting unrequested chunk {:?}, height {}, shard_id {}, because of having {:?}", chunk_hash, header.height_created(), header.shard_id(), hash);
return Err(Error::DuplicateChunkHeight);
}
Expand Down Expand Up @@ -1753,17 +1754,8 @@ impl ShardsManager {
}
}

// 2. Consider it valid and stores it
// Store chunk hash into chunk_hash_per_height_shard collection
let mut store_update = chain_store.store_update();
store_update.save_chunk_hash(
header.height_created(),
header.shard_id(),
chunk_hash.clone(),
);
store_update.commit()?;

// Merge parts and receipts included in the partial encoded chunk into chunk cache
// 2. Consider it valid; mergeparts and receipts included in the partial encoded chunk
// into chunk cache
let new_part_ords =
self.encoded_chunks.merge_in_partial_encoded_chunk(partial_encoded_chunk);

Expand Down
8 changes: 3 additions & 5 deletions core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,9 @@ pub enum DBCol {
/// - *Rows*: transaction hash
/// - *Column type*: SignedTransaction
Transactions,
/// Mapping from a given (Height, ShardId) to the Chunk hash.
/// - *Rows*: (Height || ShardId) - (u64 || u64)
/// - *Column type*: ChunkHash (CryptoHash)
ChunkPerHeightShard,
/// Deprecated.
#[strum(serialize = "ChunkPerHeightShard")]
_ChunkPerHeightShard,
/// Changes to state (Trie) that we have recorded.
/// - *Rows*: BlockHash || TrieKey (TrieKey is written via custom to_vec)
/// - *Column type*: TrieKey, new value and reason for change (RawStateChangesWithTrieKey)
Expand Down Expand Up @@ -277,7 +276,6 @@ impl DBCol {
| DBCol::BlockHeader
| DBCol::BlockExtra
| DBCol::BlockInfo
| DBCol::ChunkPerHeightShard
| DBCol::Chunks
| DBCol::InvalidChunks
| DBCol::PartialChunks => true,
Expand Down
17 changes: 0 additions & 17 deletions core/store/src/db/colddb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,6 @@ fn get_cold_key<'a>(col: DBCol, key: &[u8], buffer: &'a mut [u8; 32]) -> Option<
buffer[..8].copy_from_slice(&num.to_be_bytes());
Some(&buffer[..8])
}
DBCol::ChunkPerHeightShard => {
// Key is `little_endian(height) || ShardUId`. We’re leaving
// ShardUId alone.
let num = u64::from_le_bytes(key[..8].try_into().unwrap());
buffer[..8].copy_from_slice(&num.to_be_bytes());
buffer[8..16].copy_from_slice(&key[8..16]);
Some(&buffer[..16])
}
DBCol::State => {
// Key is `ShardUId || CryptoHash(node_or_value)`. We’re stripping
// the ShardUId.
Expand Down Expand Up @@ -330,7 +322,6 @@ mod test {
DBCol::HeaderHashesByHeight,
];
let mut ops: Vec<_> = height_columns.iter().map(|col| set(*col, HEIGHT_LE)).collect();
ops.push(set(DBCol::ChunkPerHeightShard, &[HEIGHT_LE, SHARD].concat()));
ops.push(set(DBCol::State, &[SHARD, HASH].concat()));
ops.push(set(DBCol::Block, HASH));
db.write(DBTransaction { ops }).unwrap();
Expand Down Expand Up @@ -364,8 +355,6 @@ mod test {
fetch(col, HEIGHT_LE, false);
fetch(col, HEIGHT_BE, false);
}
fetch(DBCol::ChunkPerHeightShard, &[HEIGHT_LE, SHARD].concat(), false);
fetch(DBCol::ChunkPerHeightShard, &[HEIGHT_BE, SHARD].concat(), false);
fetch(DBCol::State, &[SHARD, HASH].concat(), false);
fetch(DBCol::State, &[HASH].concat(), true);
fetch(DBCol::Block, HASH, false);
Expand Down Expand Up @@ -404,12 +393,6 @@ mod test {
HeaderHashesByHeight be(42)
[cold] get_raw_bytes → ∅
[raw ] get_raw_bytes → FooBar
ChunkPerHeightShard `le(42) || ShardUId`
[cold] get_raw_bytes → FooBar
[raw ] get_raw_bytes → ∅
ChunkPerHeightShard `be(42) || ShardUId`
[cold] get_raw_bytes → ∅
[raw ] get_raw_bytes → FooBar
State `ShardUId || 11111111111111111111111111111111`
[cold] get_raw_bytes → FooBar; rc: 1
[cold] get_sans_rc → FooBar
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ fn col_name(col: DBCol) -> &'static str {
DBCol::ComponentEdges => "col31",
DBCol::LastComponentNonce => "col32",
DBCol::Transactions => "col33",
DBCol::ChunkPerHeightShard => "col34",
DBCol::_ChunkPerHeightShard => "col34",
DBCol::StateChanges => "col35",
DBCol::BlockRefCount => "col36",
DBCol::TrieChanges => "col37",
Expand Down
10 changes: 10 additions & 0 deletions core/store/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,13 @@ pub fn migrate_29_to_30(store_opener: &StoreOpener) -> anyhow::Result<()> {
set_store_version(&store, 30)?;
Ok(())
}

pub fn migrate_31_to_32(store_opener: &StoreOpener) -> anyhow::Result<()> {
let store = store_opener.open().unwrap().get_store(crate::Temperature::Hot);
let mut store_update = store.store_update();
store_update.delete_all(DBCol::_ChunkPerHeightShard);
store_update.commit()?;

set_store_version(&store, 32)?;
Ok(())
}
2 changes: 1 addition & 1 deletion core/store/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
pub type DbVersion = u32;

/// Current version of the database.
pub const DB_VERSION: DbVersion = 31;
pub const DB_VERSION: DbVersion = 32;

/// Returns serialisation of the version.
///
Expand Down
7 changes: 6 additions & 1 deletion nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use near_network_primitives::time;
use near_primitives::block::GenesisId;
#[cfg(feature = "performance_stats")]
use near_rust_allocator_proxy::reset_memory_usage_max;
use near_store::migrations::{migrate_28_to_29, migrate_29_to_30};
use near_store::migrations::{migrate_28_to_29, migrate_29_to_30, migrate_31_to_32};
use near_store::version::{set_store_version, DbVersion, DB_VERSION};
use near_store::{DBCol, Mode, NodeStorage, StoreOpener, Temperature};
use near_telemetry::TelemetryActor;
Expand Down Expand Up @@ -167,6 +167,11 @@ fn apply_store_migrations_if_exists(
info!(target: "near", "Migrate DB from version 30 to 31");
migrate_30_to_31(store_opener, &near_config)?;
}
if db_version <= 31 {
// version 31 => 32: delete column ChunkPerHeightShard
info!(target: "near", "Migrate DB from version 30 to 31");
migrate_31_to_32(store_opener)?;
}

if cfg!(feature = "nightly") || cfg!(feature = "nightly_protocol") {
let store = store_opener
Expand Down
Loading

0 comments on commit 661aaf5

Please sign in to comment.