Skip to content

Commit

Permalink
[chunks] Fix bug that produced partial chunks were not inserted into …
Browse files Browse the repository at this point in the history
…cache (#7795)

* [chunks] Fix bug that partial chunks were not inserted into cache if we produced the chunk

* Add new tests for chunk cache behavior
  • Loading branch information
robin-near authored and Nikolay Kurtov committed Nov 9, 2022
1 parent 5c647f3 commit bef3835
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 8 deletions.
123 changes: 123 additions & 0 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,7 @@ impl ShardsManager {

pub fn distribute_encoded_chunk(
&mut self,
partial_chunk: PartialEncodedChunk,
encoded_chunk: EncodedShardChunk,
merkle_paths: &Vec<MerklePath>,
outgoing_receipts: Vec<Receipt>,
Expand Down Expand Up @@ -2150,6 +2151,7 @@ impl ShardsManager {
}

// Add it to the set of chunks to be included in the next block
self.encoded_chunks.merge_in_partial_encoded_chunk(&partial_chunk.clone().into());
self.encoded_chunks.insert_chunk_header(shard_id, chunk_header);

Ok(())
Expand Down Expand Up @@ -2180,6 +2182,7 @@ mod test {
use near_store::test_utils::create_test_store;

use super::*;
use crate::logic::persist_chunk;
use crate::test_utils::*;

/// should not request partial encoded chunk from self
Expand Down Expand Up @@ -2916,4 +2919,124 @@ mod test {
})
.is_none());
}

#[test]
fn test_chunk_cache_hit_for_produced_chunk() {
let fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
Some(fixture.mock_shard_tracker.clone()),
fixture.mock_runtime.clone(),
fixture.mock_network.clone(),
fixture.mock_client_adapter.clone(),
fixture.chain_store.new_read_only_chunks_store(),
None,
);

shards_manager
.distribute_encoded_chunk(
fixture.make_partial_encoded_chunk(&fixture.all_part_ords),
fixture.mock_encoded_chunk.clone(),
&fixture.mock_merkle_paths,
fixture.mock_outgoing_receipts.clone(),
)
.unwrap();

let (_, source, response) =
shards_manager.prepare_partial_encoded_chunk_response(PartialEncodedChunkRequestMsg {
chunk_hash: fixture.mock_chunk_header.chunk_hash(),
part_ords: fixture.all_part_ords.clone(),
tracking_shards: HashSet::new(),
});
assert_eq!(source, "cache");
assert!(response.is_some());
assert_eq!(response.unwrap().parts.len(), fixture.all_part_ords.len());
}

#[test]
fn test_chunk_cache_hit_for_received_chunk() {
let fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
Some(fixture.mock_shard_tracker.clone()),
fixture.mock_runtime.clone(),
fixture.mock_network.clone(),
fixture.mock_client_adapter.clone(),
fixture.chain_store.new_read_only_chunks_store(),
None,
);

shards_manager
.process_partial_encoded_chunk(
fixture.make_partial_encoded_chunk(&fixture.all_part_ords).into(),
)
.unwrap();

let (_, source, response) =
shards_manager.prepare_partial_encoded_chunk_response(PartialEncodedChunkRequestMsg {
chunk_hash: fixture.mock_chunk_header.chunk_hash(),
part_ords: fixture.all_part_ords.clone(),
tracking_shards: HashSet::new(),
});
assert_eq!(source, "cache");
assert!(response.is_some());
assert_eq!(response.unwrap().parts.len(), fixture.all_part_ords.len());
}

#[test]
fn test_chunk_response_for_uncached_partial_chunk() {
let mut fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
Some(fixture.mock_shard_tracker.clone()),
fixture.mock_runtime.clone(),
fixture.mock_network.clone(),
fixture.mock_client_adapter.clone(),
fixture.chain_store.new_read_only_chunks_store(),
None,
);

persist_chunk(
fixture.make_partial_encoded_chunk(&fixture.all_part_ords),
None,
&mut fixture.chain_store,
)
.unwrap();

let (_, source, response) =
shards_manager.prepare_partial_encoded_chunk_response(PartialEncodedChunkRequestMsg {
chunk_hash: fixture.mock_chunk_header.chunk_hash(),
part_ords: fixture.all_part_ords.clone(),
tracking_shards: HashSet::new(),
});
assert_eq!(source, "partial");
assert!(response.is_some());
assert_eq!(response.unwrap().parts.len(), fixture.all_part_ords.len());
}

#[test]
fn test_chunk_response_for_uncached_shard_chunk() {
let mut fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
Some(fixture.mock_shard_tracker.clone()),
fixture.mock_runtime.clone(),
fixture.mock_network.clone(),
fixture.mock_client_adapter.clone(),
fixture.chain_store.new_read_only_chunks_store(),
None,
);

let mut update = fixture.chain_store.store_update();
let shard_chunk =
fixture.mock_encoded_chunk.decode_chunk(fixture.mock_runtime.num_data_parts()).unwrap();
update.save_chunk(shard_chunk);
update.commit().unwrap();

let (_, source, response) =
shards_manager.prepare_partial_encoded_chunk_response(PartialEncodedChunkRequestMsg {
chunk_hash: fixture.mock_chunk_header.chunk_hash(),
part_ords: fixture.all_part_ords.clone(),
tracking_shards: HashSet::new(),
});
assert_eq!(source, "chunk");
assert!(response.is_some());
assert_eq!(response.unwrap().parts.len(), fixture.all_part_ords.len());
}
}
24 changes: 18 additions & 6 deletions chain/chunks/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use actix::MailboxError;
use futures::future::BoxFuture;
use futures::FutureExt;
use near_network::types::MsgRecipient;
use near_primitives::receipt::Receipt;
use near_primitives::time::Clock;

use near_chain::test_utils::{KeyValueRuntime, ValidatorSchedule};
Expand All @@ -14,10 +15,10 @@ use near_crypto::KeyType;
use near_network::test_utils::MockPeerManagerAdapter;
use near_primitives::block::BlockHeader;
use near_primitives::hash::{self, CryptoHash};
use near_primitives::merkle;
use near_primitives::merkle::{self, MerklePath};
use near_primitives::sharding::{
ChunkHash, PartialEncodedChunk, PartialEncodedChunkPart, PartialEncodedChunkV2,
ReedSolomonWrapper, ShardChunkHeader,
ChunkHash, EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkPart,
PartialEncodedChunkV2, ReedSolomonWrapper, ShardChunkHeader,
};
use near_primitives::types::NumShards;
use near_primitives::types::{AccountId, EpochId, ShardId};
Expand Down Expand Up @@ -141,7 +142,11 @@ pub struct ChunkTestFixture {
pub mock_network: Arc<MockPeerManagerAdapter>,
pub mock_client_adapter: Arc<MockClientAdapterForShardsManager>,
pub chain_store: ChainStore,
pub all_part_ords: Vec<u64>,
pub mock_part_ords: Vec<u64>,
pub mock_merkle_paths: Vec<MerklePath>,
pub mock_outgoing_receipts: Vec<Receipt>,
pub mock_encoded_chunk: EncodedShardChunk,
pub mock_chunk_part_owner: AccountId,
pub mock_shard_tracker: AccountId,
pub mock_chunk_header: ShardChunkHeader,
Expand Down Expand Up @@ -238,7 +243,7 @@ impl ChunkTestFixture {
let shard_layout = mock_runtime.get_shard_layout(&EpochId::default()).unwrap();
let receipts_hashes = Chain::build_receipts_hashes(&receipts, &shard_layout);
let (receipts_root, _) = merkle::merklize(&receipts_hashes);
let (mock_chunk, mock_merkles) = ShardsManager::create_encoded_shard_chunk(
let (mock_chunk, mock_merkle_paths) = ShardsManager::create_encoded_shard_chunk(
mock_parent_hash,
Default::default(),
Default::default(),
Expand Down Expand Up @@ -267,16 +272,23 @@ impl ChunkTestFixture {
mock_runtime.get_part_owner(&mock_epoch_id, *p).unwrap() == mock_chunk_part_owner
})
.collect();
let encoded_chunk =
mock_chunk.create_partial_encoded_chunk(all_part_ords, Vec::new(), &mock_merkles);
let encoded_chunk = mock_chunk.create_partial_encoded_chunk(
all_part_ords.clone(),
Vec::new(),
&mock_merkle_paths,
);
let chain_store = ChainStore::new(mock_runtime.get_store(), 0, true);

ChunkTestFixture {
mock_runtime,
mock_network,
mock_client_adapter,
chain_store,
all_part_ords,
mock_part_ords,
mock_encoded_chunk: mock_chunk,
mock_merkle_paths,
mock_outgoing_receipts: receipts,
mock_chunk_part_owner,
mock_shard_tracker,
mock_chunk_header: encoded_chunk.cloned_header(),
Expand Down
9 changes: 7 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,8 +1272,13 @@ impl Client {
self.me.as_ref(),
self.runtime_adapter.as_ref(),
)?;
persist_chunk(partial_chunk, Some(shard_chunk), self.chain.mut_store())?;
self.shards_mgr.distribute_encoded_chunk(encoded_chunk, &merkle_paths, receipts)?;
persist_chunk(partial_chunk.clone(), Some(shard_chunk), self.chain.mut_store())?;
self.shards_mgr.distribute_encoded_chunk(
partial_chunk,
encoded_chunk,
&merkle_paths,
receipts,
)?;
Ok(())
}

Expand Down

0 comments on commit bef3835

Please sign in to comment.