Skip to content

Commit

Permalink
[chunks] Move ChainStore use out of ShardsManager and into the Client (
Browse files Browse the repository at this point in the history
…#7695)

* Draft for splitting chunks store

* Fix a test

* Unwrap ChunksLogic struct

* Fix formatting
  • Loading branch information
robin-near authored Sep 29, 2022
1 parent 79278e2 commit 73de01b
Show file tree
Hide file tree
Showing 15 changed files with 519 additions and 403 deletions.
60 changes: 60 additions & 0 deletions chain/chain/src/chunks_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::sync::Arc;

use borsh::BorshDeserialize;
use near_cache::CellLruCache;
use near_chain_primitives::Error;
use near_primitives::sharding::{ChunkHash, PartialEncodedChunk, ShardChunk};
use near_store::{DBCol, Store};

#[cfg(not(feature = "no_cache"))]
const CHUNK_CACHE_SIZE: usize = 1024;
#[cfg(feature = "no_cache")]
const CHUNK_CACHE_SIZE: usize = 1;

pub struct ReadOnlyChunksStore {
store: Store,
partial_chunks: CellLruCache<Vec<u8>, Arc<PartialEncodedChunk>>,
chunks: CellLruCache<Vec<u8>, Arc<ShardChunk>>,
}

impl ReadOnlyChunksStore {
pub fn new(store: Store) -> Self {
Self {
store,
partial_chunks: CellLruCache::new(CHUNK_CACHE_SIZE),
chunks: CellLruCache::new(CHUNK_CACHE_SIZE),
}
}

fn read_with_cache<'a, T: BorshDeserialize + Clone + 'a>(
&self,
col: DBCol,
cache: &'a CellLruCache<Vec<u8>, T>,
key: &[u8],
) -> std::io::Result<Option<T>> {
if let Some(value) = cache.get(key) {
return Ok(Some(value));
}
if let Some(result) = self.store.get_ser::<T>(col, key)? {
cache.put(key.to_vec(), result.clone());
return Ok(Some(result));
}
Ok(None)
}
pub fn get_partial_chunk(
&self,
chunk_hash: &ChunkHash,
) -> Result<Arc<PartialEncodedChunk>, Error> {
match self.read_with_cache(DBCol::PartialChunks, &self.partial_chunks, chunk_hash.as_ref())
{
Ok(Some(shard_chunk)) => Ok(shard_chunk),
_ => Err(Error::ChunkMissing(chunk_hash.clone())),
}
}
pub fn get_chunk(&self, chunk_hash: &ChunkHash) -> Result<Arc<ShardChunk>, Error> {
match self.read_with_cache(DBCol::Chunks, &self.chunks, chunk_hash.as_ref()) {
Ok(Some(shard_chunk)) => Ok(shard_chunk),
_ => Err(Error::ChunkMissing(chunk_hash.clone())),
}
}
}
1 change: 1 addition & 0 deletions chain/chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use types::{Block, BlockHeader, BlockStatus, ChainGenesis, Provenance, Runti
mod block_processing_utils;
pub mod blocks_delay_tracker;
pub mod chain;
pub mod chunks_store;
pub mod crypto_hash_timer;
mod doomslug;
mod lightclient;
Expand Down
5 changes: 5 additions & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use near_store::{
LATEST_KNOWN_KEY, TAIL_KEY,
};

use crate::chunks_store::ReadOnlyChunksStore;
use crate::types::{Block, BlockHeader, LatestKnown};
use crate::{byzantine_assert, RuntimeAdapter};
use near_store::db::StoreStatistics;
Expand Down Expand Up @@ -399,6 +400,10 @@ impl ChainStore {
}
}

pub fn new_read_only_chunks_store(&self) -> ReadOnlyChunksStore {
ReadOnlyChunksStore::new(self.store.clone())
}

pub fn store_update(&mut self) -> ChainStoreUpdate<'_> {
ChainStoreUpdate::new(self)
}
Expand Down
29 changes: 24 additions & 5 deletions chain/chunks/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
use actix::Message;
use near_network::types::MsgRecipient;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::sharding::{EncodedShardChunk, PartialEncodedChunk, ShardChunk};

pub trait ClientAdapterForShardsManager {
fn did_complete_chunk(
&self,
partial_chunk: PartialEncodedChunk,
shard_chunk: Option<ShardChunk>,
);
fn saw_invalid_chunk(&self, chunk: EncodedShardChunk);
}

#[derive(Message)]
#[rtype(result = "()")]
pub enum ShardsManagerResponse {
ChunkCompleted(ShardChunkHeader),
ChunkCompleted { partial_chunk: PartialEncodedChunk, shard_chunk: Option<ShardChunk> },
InvalidChunk(EncodedShardChunk),
}

pub trait ClientAdapterForShardsManager: MsgRecipient<ShardsManagerResponse> {}

impl<A: MsgRecipient<ShardsManagerResponse>> ClientAdapterForShardsManager for A {}
impl<A: MsgRecipient<ShardsManagerResponse>> ClientAdapterForShardsManager for A {
fn did_complete_chunk(
&self,
partial_chunk: PartialEncodedChunk,
shard_chunk: Option<ShardChunk>,
) {
self.do_send(ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk });
}
fn saw_invalid_chunk(&self, chunk: EncodedShardChunk) {
self.do_send(ShardsManagerResponse::InvalidChunk(chunk));
}
}
Loading

0 comments on commit 73de01b

Please sign in to comment.