diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 7a1e2ec0337..235ea8eaae6 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -64,13 +64,6 @@ pub enum GCMode { StateSync { clear_block_info: bool }, } -fn get_height_shard_id(height: BlockHeight, shard_id: ShardId) -> Vec { - let mut res = Vec::with_capacity(40); - res.extend_from_slice(&height.to_le_bytes()); - res.extend_from_slice(&shard_id.to_le_bytes()); - res -} - /// Accesses the chain store. Used to create atomic editable views that can be reverted. pub trait ChainStoreAccess { /// Returns underlaying store. @@ -175,12 +168,6 @@ pub trait ChainStoreAccess { ) -> Result, Error>; /// Returns a number of references for Block with `block_hash` fn get_block_refcount(&self, block_hash: &CryptoHash) -> Result; - /// Check if we saw chunk hash at given height and shard id. - fn get_any_chunk_hash_by_height_shard( - &self, - height: BlockHeight, - shard_id: ShardId, - ) -> Result; /// Returns block header from the current chain defined by `sync_hash` for given height if present. fn get_block_header_on_chain_by_height( &self, @@ -341,8 +328,6 @@ pub struct ChainStore { height: CellLruCache, CryptoHash>, /// Cache with height to block hash on any chain. block_hash_per_height: CellLruCache, Arc>>>, - /// Cache with height and shard_id to any chunk hash. - chunk_hash_per_height_shard: CellLruCache, ChunkHash>, /// Next block hashes for each block on the canonical chain next_block_hashes: CellLruCache, CryptoHash>, /// Light client blocks corresponding to the last finalized block of each epoch @@ -399,7 +384,6 @@ impl ChainStore { height: CellLruCache::new(CACHE_SIZE), block_hash_per_height: CellLruCache::new(CACHE_SIZE), block_refcounts: CellLruCache::new(CACHE_SIZE), - chunk_hash_per_height_shard: CellLruCache::new(CACHE_SIZE), next_block_hashes: CellLruCache::new(CACHE_SIZE), epoch_light_client_blocks: CellLruCache::new(CACHE_SIZE), outgoing_receipts: CellLruCache::new(CACHE_SIZE), @@ -993,21 +977,6 @@ impl ChainStoreAccess for ChainStore { ) } - fn get_any_chunk_hash_by_height_shard( - &self, - height: BlockHeight, - shard_id: ShardId, - ) -> Result { - option_to_not_found( - self.read_with_cache( - DBCol::ChunkPerHeightShard, - &self.chunk_hash_per_height_shard, - &get_height_shard_id(height, shard_id), - ), - format_args!("CHUNK PER HEIGHT AND SHARD ID: {} {}", height, shard_id), - ) - } - /// Get outgoing receipts *generated* from shard `shard_id` in block `prev_hash` /// Note that this function is different from get_outgoing_receipts_for_shard, see comments there fn get_outgoing_receipts( @@ -1147,7 +1116,6 @@ struct ChainStoreCacheUpdate { chunks: HashMap>, partial_chunks: HashMap>, block_hash_per_height: HashMap>>, - chunk_hash_per_height_shard: HashMap<(BlockHeight, ShardId), ChunkHash>, height_to_hashes: HashMap>, next_block_hashes: HashMap, epoch_light_client_blocks: HashMap>, @@ -1371,20 +1339,6 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> { } } - fn get_any_chunk_hash_by_height_shard( - &self, - height: BlockHeight, - shard_id: ShardId, - ) -> Result { - if let Some(chunk_hash) = - self.chain_store_cache_update.chunk_hash_per_height_shard.get(&(height, shard_id)) - { - Ok(chunk_hash.clone()) - } else { - self.chain_store.get_any_chunk_hash_by_height_shard(height, shard_id) - } - } - fn get_next_block_hash(&self, hash: &CryptoHash) -> Result { if let Some(next_hash) = self.chain_store_cache_update.next_block_hashes.get(hash) { Ok(*next_hash) @@ -1891,17 +1845,6 @@ impl<'a> ChainStoreUpdate<'a> { self.chain_store_cache_update.invalid_chunks.insert(chunk.chunk_hash(), Arc::new(chunk)); } - pub fn save_chunk_hash( - &mut self, - height: BlockHeight, - shard_id: ShardId, - chunk_hash: ChunkHash, - ) { - self.chain_store_cache_update - .chunk_hash_per_height_shard - .insert((height, shard_id), chunk_hash); - } - pub fn save_block_height_processed(&mut self, height: BlockHeight) { self.chain_store_cache_update.processed_block_heights.insert(height); } @@ -2135,7 +2078,6 @@ impl<'a> ChainStoreUpdate<'a> { let block_shard_id = get_block_shard_id(&block_hash, shard_id); self.gc_outgoing_receipts(&block_hash, shard_id); self.gc_col(DBCol::IncomingReceipts, &block_shard_id); - self.gc_col(DBCol::ChunkPerHeightShard, &block_shard_id); // For incoming State Parts it's done in chain.clear_downloaded_parts() // The following code is mostly for outgoing State Parts. @@ -2323,10 +2265,6 @@ impl<'a> ChainStoreUpdate<'a> { store_update.delete(col, key); self.chain_store.incoming_receipts.pop(key); } - DBCol::ChunkPerHeightShard => { - store_update.delete(col, key); - self.chain_store.chunk_hash_per_height_shard.pop(key); - } DBCol::StateHeaders => { store_update.delete(col, key); } @@ -2439,6 +2377,7 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::EpochStart | DBCol::EpochValidatorInfo | DBCol::BlockOrdinal + | DBCol::_ChunkPerHeightShard | DBCol::_NextBlockWithNewChunk | DBCol::_LastBlockWithNewChunk | DBCol::_TransactionRefCount @@ -2533,10 +2472,6 @@ impl<'a> ChainStoreUpdate<'a> { .chain_store_cache_update .chunks .insert(chunk_hash.clone(), source_store.get_chunk(&chunk_hash)?.clone()); - chain_store_update - .chain_store_cache_update - .chunk_hash_per_height_shard - .insert((height, shard_id), chunk_hash); chain_store_update.chain_store_cache_update.outgoing_receipts.insert( (*block_hash, shard_id), source_store.get_outgoing_receipts(block_hash, shard_id)?.clone(), @@ -2652,12 +2587,6 @@ impl<'a> ChainStoreUpdate<'a> { for (block_hash, block_extra) in self.chain_store_cache_update.block_extras.iter() { store_update.insert_ser(DBCol::BlockExtra, block_hash.as_ref(), block_extra)?; } - for ((height, shard_id), chunk_hash) in - self.chain_store_cache_update.chunk_hash_per_height_shard.iter() - { - let key = get_height_shard_id(*height, *shard_id); - store_update.insert_ser(DBCol::ChunkPerHeightShard, &key, chunk_hash)?; - } let mut chunk_hashes_by_height: HashMap> = HashMap::new(); for (chunk_hash, chunk) in self.chain_store_cache_update.chunks.iter() { if self.chain_store.get_chunk(chunk_hash).is_ok() { @@ -2907,7 +2836,6 @@ impl<'a> ChainStoreUpdate<'a> { chunks, partial_chunks, block_hash_per_height, - chunk_hash_per_height_shard, height_to_hashes, next_block_hashes, epoch_light_client_blocks, @@ -2949,10 +2877,6 @@ impl<'a> ChainStoreUpdate<'a> { .block_hash_per_height .put(index_to_bytes(height).to_vec(), Arc::new(epoch_id_to_hash)); } - for ((height, shard_id), chunk_hash) in chunk_hash_per_height_shard { - let key = get_height_shard_id(height, shard_id); - self.chain_store.chunk_hash_per_height_shard.put(key, chunk_hash); - } for (height, block_hash) in height_to_hashes { let bytes = index_to_bytes(height); if let Some(hash) = block_hash { diff --git a/chain/chunks/src/chunk_cache.rs b/chain/chunks/src/chunk_cache.rs index 854c76db071..79ab2a544d4 100644 --- a/chain/chunks/src/chunk_cache.rs +++ b/chain/chunks/src/chunk_cache.rs @@ -59,6 +59,9 @@ pub struct EncodedChunksCache { /// A map from a block height to chunk hashes at this height for all chunk stored in the cache /// This is used to gc chunks that are out of horizon height_map: HashMap>, + /// A map from block height to shard ID to the chunk hash we've received, so we only process + /// one chunk per shard per height. + height_to_shard_to_chunk: HashMap>, /// A map from a block hash to a set of incomplete chunks (does not have all parts and receipts yet) /// whose previous block is the block hash. incomplete_chunks: HashMap>, @@ -108,6 +111,7 @@ impl EncodedChunksCache { largest_seen_height: 0, encoded_chunks: HashMap::new(), height_map: HashMap::new(), + height_to_shard_to_chunk: HashMap::new(), incomplete_chunks: HashMap::new(), block_hash_to_chunk_headers: HashMap::new(), } @@ -180,6 +184,10 @@ impl EncodedChunksCache { .entry(chunk_header.height_created()) .or_default() .insert(chunk_hash.clone()); + self.height_to_shard_to_chunk + .entry(chunk_header.height_created()) + .or_default() + .insert(chunk_header.shard_id(), chunk_hash.clone()); self.incomplete_chunks .entry(chunk_header.prev_block_hash().clone()) .or_default() @@ -200,6 +208,14 @@ impl EncodedChunksCache { self.height_within_front_horizon(height) || self.height_within_rear_horizon(height) } + pub fn get_chunk_hash_by_height_and_shard( + &self, + height: BlockHeight, + shard_id: ShardId, + ) -> Option<&ChunkHash> { + self.height_to_shard_to_chunk.get(&height)?.get(&shard_id) + } + /// Add parts and receipts stored in a partial encoded chunk to the corresponding chunk entry, /// returning the set of part ords that were previously unknown. pub fn merge_in_partial_encoded_chunk( @@ -240,6 +256,7 @@ impl EncodedChunksCache { } } } + self.height_to_shard_to_chunk.remove(&height); } } diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index 0871b297059..81bc99f70bb 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -1701,10 +1701,11 @@ impl ShardsManager { return Err(Error::ChainError(near_chain::Error::InvalidChunkHeight)); } // We shouldn't process unrequested chunk if we have seen one with same (height_created + shard_id) but different chunk_hash - if let Ok(hash) = chain_store - .get_any_chunk_hash_by_height_shard(header.height_created(), header.shard_id()) + if let Some(hash) = self + .encoded_chunks + .get_chunk_hash_by_height_and_shard(header.height_created(), header.shard_id()) { - if hash != chunk_hash { + if hash != &chunk_hash { warn!(target: "client", "Rejecting unrequested chunk {:?}, height {}, shard_id {}, because of having {:?}", chunk_hash, header.height_created(), header.shard_id(), hash); return Err(Error::DuplicateChunkHeight); } @@ -1753,17 +1754,8 @@ impl ShardsManager { } } - // 2. Consider it valid and stores it - // Store chunk hash into chunk_hash_per_height_shard collection - let mut store_update = chain_store.store_update(); - store_update.save_chunk_hash( - header.height_created(), - header.shard_id(), - chunk_hash.clone(), - ); - store_update.commit()?; - - // Merge parts and receipts included in the partial encoded chunk into chunk cache + // 2. Consider it valid; mergeparts and receipts included in the partial encoded chunk + // into chunk cache let new_part_ords = self.encoded_chunks.merge_in_partial_encoded_chunk(partial_encoded_chunk); diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index 928ef45d67c..c4e771b750a 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -175,10 +175,9 @@ pub enum DBCol { /// - *Rows*: transaction hash /// - *Column type*: SignedTransaction Transactions, - /// Mapping from a given (Height, ShardId) to the Chunk hash. - /// - *Rows*: (Height || ShardId) - (u64 || u64) - /// - *Column type*: ChunkHash (CryptoHash) - ChunkPerHeightShard, + /// Deprecated. + #[strum(serialize = "ChunkPerHeightShard")] + _ChunkPerHeightShard, /// Changes to state (Trie) that we have recorded. /// - *Rows*: BlockHash || TrieKey (TrieKey is written via custom to_vec) /// - *Column type*: TrieKey, new value and reason for change (RawStateChangesWithTrieKey) @@ -277,7 +276,6 @@ impl DBCol { | DBCol::BlockHeader | DBCol::BlockExtra | DBCol::BlockInfo - | DBCol::ChunkPerHeightShard | DBCol::Chunks | DBCol::InvalidChunks | DBCol::PartialChunks => true, diff --git a/core/store/src/db/colddb.rs b/core/store/src/db/colddb.rs index 7a881f246fd..6ba0f25e5cf 100644 --- a/core/store/src/db/colddb.rs +++ b/core/store/src/db/colddb.rs @@ -196,14 +196,6 @@ fn get_cold_key<'a>(col: DBCol, key: &[u8], buffer: &'a mut [u8; 32]) -> Option< buffer[..8].copy_from_slice(&num.to_be_bytes()); Some(&buffer[..8]) } - DBCol::ChunkPerHeightShard => { - // Key is `little_endian(height) || ShardUId`. We’re leaving - // ShardUId alone. - let num = u64::from_le_bytes(key[..8].try_into().unwrap()); - buffer[..8].copy_from_slice(&num.to_be_bytes()); - buffer[8..16].copy_from_slice(&key[8..16]); - Some(&buffer[..16]) - } DBCol::State => { // Key is `ShardUId || CryptoHash(node_or_value)`. We’re stripping // the ShardUId. @@ -330,7 +322,6 @@ mod test { DBCol::HeaderHashesByHeight, ]; let mut ops: Vec<_> = height_columns.iter().map(|col| set(*col, HEIGHT_LE)).collect(); - ops.push(set(DBCol::ChunkPerHeightShard, &[HEIGHT_LE, SHARD].concat())); ops.push(set(DBCol::State, &[SHARD, HASH].concat())); ops.push(set(DBCol::Block, HASH)); db.write(DBTransaction { ops }).unwrap(); @@ -364,8 +355,6 @@ mod test { fetch(col, HEIGHT_LE, false); fetch(col, HEIGHT_BE, false); } - fetch(DBCol::ChunkPerHeightShard, &[HEIGHT_LE, SHARD].concat(), false); - fetch(DBCol::ChunkPerHeightShard, &[HEIGHT_BE, SHARD].concat(), false); fetch(DBCol::State, &[SHARD, HASH].concat(), false); fetch(DBCol::State, &[HASH].concat(), true); fetch(DBCol::Block, HASH, false); @@ -404,12 +393,6 @@ mod test { HeaderHashesByHeight be(42) [cold] get_raw_bytes → ∅ [raw ] get_raw_bytes → FooBar - ChunkPerHeightShard `le(42) || ShardUId` - [cold] get_raw_bytes → FooBar - [raw ] get_raw_bytes → ∅ - ChunkPerHeightShard `be(42) || ShardUId` - [cold] get_raw_bytes → ∅ - [raw ] get_raw_bytes → FooBar State `ShardUId || 11111111111111111111111111111111` [cold] get_raw_bytes → FooBar; rc: 1 [cold] get_sans_rc → FooBar diff --git a/core/store/src/db/rocksdb.rs b/core/store/src/db/rocksdb.rs index d4aff53c975..46863494fed 100644 --- a/core/store/src/db/rocksdb.rs +++ b/core/store/src/db/rocksdb.rs @@ -637,7 +637,7 @@ fn col_name(col: DBCol) -> &'static str { DBCol::ComponentEdges => "col31", DBCol::LastComponentNonce => "col32", DBCol::Transactions => "col33", - DBCol::ChunkPerHeightShard => "col34", + DBCol::_ChunkPerHeightShard => "col34", DBCol::StateChanges => "col35", DBCol::BlockRefCount => "col36", DBCol::TrieChanges => "col37", diff --git a/core/store/src/migrations.rs b/core/store/src/migrations.rs index 85abe274a5a..2cf878bb04c 100644 --- a/core/store/src/migrations.rs +++ b/core/store/src/migrations.rs @@ -173,3 +173,13 @@ pub fn migrate_29_to_30(store_opener: &StoreOpener) -> anyhow::Result<()> { set_store_version(&store, 30)?; Ok(()) } + +pub fn migrate_31_to_32(store_opener: &StoreOpener) -> anyhow::Result<()> { + let store = store_opener.open().unwrap().get_store(crate::Temperature::Hot); + let mut store_update = store.store_update(); + store_update.delete_all(DBCol::_ChunkPerHeightShard); + store_update.commit()?; + + set_store_version(&store, 32)?; + Ok(()) +} diff --git a/core/store/src/version.rs b/core/store/src/version.rs index 2f763899ed2..26434866666 100644 --- a/core/store/src/version.rs +++ b/core/store/src/version.rs @@ -2,7 +2,7 @@ pub type DbVersion = u32; /// Current version of the database. -pub const DB_VERSION: DbVersion = 31; +pub const DB_VERSION: DbVersion = 32; /// Returns serialisation of the version. /// diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 1766d10251a..b0c16fca016 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -14,7 +14,7 @@ use near_network_primitives::time; use near_primitives::block::GenesisId; #[cfg(feature = "performance_stats")] use near_rust_allocator_proxy::reset_memory_usage_max; -use near_store::migrations::{migrate_28_to_29, migrate_29_to_30}; +use near_store::migrations::{migrate_28_to_29, migrate_29_to_30, migrate_31_to_32}; use near_store::version::{set_store_version, DbVersion, DB_VERSION}; use near_store::{DBCol, Mode, NodeStorage, StoreOpener, Temperature}; use near_telemetry::TelemetryActor; @@ -167,6 +167,11 @@ fn apply_store_migrations_if_exists( info!(target: "near", "Migrate DB from version 30 to 31"); migrate_30_to_31(store_opener, &near_config)?; } + if db_version <= 31 { + // version 31 => 32: delete column ChunkPerHeightShard + info!(target: "near", "Migrate DB from version 30 to 31"); + migrate_31_to_32(store_opener)?; + } if cfg!(feature = "nightly") || cfg!(feature = "nightly_protocol") { let store = store_opener diff --git a/tools/mock-node/src/lib.rs b/tools/mock-node/src/lib.rs index 5a6abfbfd8f..0d4c92b2b0f 100644 --- a/tools/mock-node/src/lib.rs +++ b/tools/mock-node/src/lib.rs @@ -93,7 +93,12 @@ fn retrieve_starting_chunk_hash( ) -> anyhow::Result { let mut last_err = None; for height in client_start_height..target_height + 1 { - match chain.store().get_any_chunk_hash_by_height_shard(height, 0) { + match chain + .store() + .get_block_hash_by_height(height) + .and_then(|hash| chain.store().get_block(&hash)) + .map(|block| block.chunks().iter().next().unwrap().chunk_hash()) + { Ok(hash) => return Ok(hash), Err(e) => { last_err = Some(e);