Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(l1): batch account updates in full sync #2088

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f2dd8b9
fix: full syncing
MarcosNicolau Feb 24, 2025
164e8b6
chore: lower peer timeout when requesting block headers and bodies
MarcosNicolau Feb 24, 2025
88d62ad
fix: max request lengths
MarcosNicolau Feb 24, 2025
c1b7926
chore: max bodies request constants
MarcosNicolau Feb 24, 2025
86b8f54
chore: address clippy warnings
MarcosNicolau Feb 24, 2025
4450593
feat: indexed chunks type for libdmbx
MarcosNicolau Feb 24, 2025
1347db6
feat: implement indexed chunks for receipts
MarcosNicolau Feb 24, 2025
7cc01fd
refactor: return key value pair in indexed chunk from impl
MarcosNicolau Feb 24, 2025
eaaf721
chore: address clippy warnings
MarcosNicolau Feb 24, 2025
b0b2f8f
test: libmdbx indexed chunks
MarcosNicolau Feb 24, 2025
c6b3a52
feat: time metrics for syncing
MarcosNicolau Feb 25, 2025
693950e
Merge remote-tracking branch 'origin' into fix/full-sync
MarcosNicolau Feb 25, 2025
06198bd
feat: metrics for account updates
MarcosNicolau Feb 25, 2025
36eb055
Merge branch 'fix/full-sync' into feat/syncing-benchmark
MarcosNicolau Feb 25, 2025
9e39f01
fix: add block result in payload
MarcosNicolau Feb 25, 2025
d7a1f1d
fix: get chunk last block when downloading blocks
MarcosNicolau Feb 25, 2025
256f4b3
fix: division by zero in stats
MarcosNicolau Feb 25, 2025
2338c22
Merge branch 'fix/full-sync' into full-sync-holesky
MarcosNicolau Feb 25, 2025
f29b926
Merge branch 'feat/syncing-benchmark' into full-sync-holesky
MarcosNicolau Feb 25, 2025
a016dec
Merge branch 'fix/receipts-invalid-size-libmdbx' into full-sync-holesky
MarcosNicolau Feb 25, 2025
2521f8b
feat: quick solution to batch state root validation
MarcosNicolau Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/ef_tests/blockchain/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn run_ef_test(test_key: &str, test: &TestUnit) {
let hash = block.hash();

// Attempt to add the block as the head of the chain
let chain_result = blockchain.add_block(block);
let chain_result = blockchain.add_block(block, true);
match chain_result {
Err(error) => {
assert!(
Expand Down
30 changes: 19 additions & 11 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl Blockchain {
}
}

pub fn add_block(&self, block: &Block) -> Result<(), ChainError> {
pub fn add_block(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking something in the lines of adding add_blocks, which only commits the trie once.

Then you can trivially add
fn add_block(block) { add_blocks(vec![block])}

Thoughts?

&self,
block: &Block,
should_apply_account_updates: bool,
) -> Result<u64, ChainError> {
let since = Instant::now();

let block_hash = block.header.compute_block_hash();
Expand All @@ -73,14 +77,18 @@ impl Blockchain {
validate_gas_used(&receipts, &block.header)?;

// Apply the account updates over the last block's state and compute the new state root
let new_state_root = state
.database()
.ok_or(ChainError::StoreError(StoreError::MissingStore))?
.apply_account_updates(block.header.parent_hash, &account_updates)?
.ok_or(ChainError::ParentStateNotFound)?;

// Check state root matches the one in block header after execution
validate_state_root(&block.header, new_state_root)?;
let mut account_updates_elapsed = 0;
if should_apply_account_updates {
let account_updates_started_at = std::time::Instant::now();
let new_state_root = state
.database()
.ok_or(ChainError::StoreError(StoreError::MissingStore))?
.apply_account_updates(block.header.parent_hash, &account_updates)?
.ok_or(ChainError::ParentStateNotFound)?;
account_updates_elapsed = account_updates_started_at.elapsed().as_secs();
// Check state root matches the one in block header after execution
validate_state_root(&block.header, new_state_root)?;
}

// Check receipts root matches the one in block header after execution
validate_receipts_root(&block.header, &receipts)?;
Expand All @@ -98,7 +106,7 @@ impl Blockchain {
info!("[METRIC] BLOCK EXECUTION THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs");
}

Ok(())
Ok(account_updates_elapsed)
}

//TODO: Forkchoice Update shouldn't be part of this function
Expand All @@ -110,7 +118,7 @@ impl Blockchain {
"Adding block {} with hash {:#x}.",
block.header.number, hash
);
if let Err(error) = self.add_block(block) {
if let Err(error) = self.add_block(block, true) {
warn!(
"Failed to add block {} with hash {:#x}: {}.",
block.header.number, hash, error
Expand Down
26 changes: 13 additions & 13 deletions crates/blockchain/smoke_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod blockchain_integration_test {
// Add first block. We'll make it canonical.
let block_1a = new_block(&store, &genesis_header);
let hash_1a = block_1a.hash();
blockchain.add_block(&block_1a).unwrap();
blockchain.add_block(&block_1a, true).unwrap();
store.set_canonical_block(1, hash_1a).unwrap();
let retrieved_1a = store.get_block_header(1).unwrap().unwrap();

Expand All @@ -40,7 +40,7 @@ mod blockchain_integration_test {
let block_1b = new_block(&store, &genesis_header);
let hash_1b = block_1b.hash();
blockchain
.add_block(&block_1b)
.add_block(&block_1b, true)
.expect("Could not add block 1b.");
let retrieved_1b = store.get_block_header_by_hash(hash_1b).unwrap().unwrap();

Expand All @@ -51,7 +51,7 @@ mod blockchain_integration_test {
let block_2 = new_block(&store, &block_1b.header);
let hash_2 = block_2.hash();
blockchain
.add_block(&block_2)
.add_block(&block_2, true)
.expect("Could not add block 2.");
let retrieved_2 = store.get_block_header_by_hash(hash_2).unwrap();

Expand Down Expand Up @@ -85,14 +85,14 @@ mod blockchain_integration_test {
// Build a single valid block.
let block_1 = new_block(&store, &genesis_header);
let hash_1 = block_1.header.compute_block_hash();
blockchain.add_block(&block_1).unwrap();
blockchain.add_block(&block_1, true).unwrap();
apply_fork_choice(&store, hash_1, H256::zero(), H256::zero()).unwrap();

// Build a child, then change its parent, making it effectively a pending block.
let mut block_2 = new_block(&store, &block_1.header);
block_2.header.parent_hash = H256::random();
let hash_2 = block_2.header.compute_block_hash();
let result = blockchain.add_block(&block_2);
let result = blockchain.add_block(&block_2, true);
assert!(matches!(result, Err(ChainError::ParentNotFound)));

// block 2 should now be pending.
Expand All @@ -118,7 +118,7 @@ mod blockchain_integration_test {
// Add first block. Not canonical.
let block_1a = new_block(&store, &genesis_header);
let hash_1a = block_1a.hash();
blockchain.add_block(&block_1a).unwrap();
blockchain.add_block(&block_1a, true).unwrap();
let retrieved_1a = store.get_block_header_by_hash(hash_1a).unwrap().unwrap();

assert!(!is_canonical(&store, 1, hash_1a).unwrap());
Expand All @@ -127,7 +127,7 @@ mod blockchain_integration_test {
let block_1b = new_block(&store, &genesis_header);
let hash_1b = block_1b.hash();
blockchain
.add_block(&block_1b)
.add_block(&block_1b, true)
.expect("Could not add block 1b.");
apply_fork_choice(&store, hash_1b, genesis_hash, genesis_hash).unwrap();
let retrieved_1b = store.get_block_header(1).unwrap().unwrap();
Expand All @@ -141,7 +141,7 @@ mod blockchain_integration_test {
let block_2 = new_block(&store, &block_1b.header);
let hash_2 = block_2.hash();
blockchain
.add_block(&block_2)
.add_block(&block_2, true)
.expect("Could not add block 2.");
apply_fork_choice(&store, hash_2, genesis_hash, genesis_hash).unwrap();
let retrieved_2 = store.get_block_header_by_hash(hash_2).unwrap();
Expand Down Expand Up @@ -181,14 +181,14 @@ mod blockchain_integration_test {
let block_1 = new_block(&store, &genesis_header);
let hash_1 = block_1.hash();
blockchain
.add_block(&block_1)
.add_block(&block_1, true)
.expect("Could not add block 1b.");

// Add child at height 2.
let block_2 = new_block(&store, &block_1.header);
let hash_2 = block_2.hash();
blockchain
.add_block(&block_2)
.add_block(&block_2, true)
.expect("Could not add block 2.");

assert!(!is_canonical(&store, 1, hash_1).unwrap());
Expand Down Expand Up @@ -229,14 +229,14 @@ mod blockchain_integration_test {
// Add block at height 1.
let block_1 = new_block(&store, &genesis_header);
blockchain
.add_block(&block_1)
.add_block(&block_1, true)
.expect("Could not add block 1b.");

// Add child at height 2.
let block_2 = new_block(&store, &block_1.header);
let hash_2 = block_2.hash();
blockchain
.add_block(&block_2)
.add_block(&block_2, true)
.expect("Could not add block 2.");

assert_eq!(latest_canonical_block_hash(&store).unwrap(), genesis_hash);
Expand All @@ -250,7 +250,7 @@ mod blockchain_integration_test {
let block_1b = new_block(&store, &genesis_header);
let hash_b = block_1b.hash();
blockchain
.add_block(&block_1b)
.add_block(&block_1b, true)
.expect("Could not add block b.");

// The latest block should be the same.
Expand Down
36 changes: 22 additions & 14 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use crate::{
kademlia::{KademliaTable, PeerChannels},
rlpx::{
eth::{
blocks::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT,
},
blocks::{BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders},
receipts::{GetReceipts, Receipts},
},
message::Message as RLPxMessage,
Expand All @@ -29,12 +27,21 @@ use crate::{
snap::encodable_to_proof,
};
use tracing::info;
pub const PEER_REPLY_TIMOUT: Duration = Duration::from_secs(45);
pub const PEER_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
pub const PEER_SELECT_RETRY_ATTEMPTS: usize = 3;
pub const REQUEST_RETRY_ATTEMPTS: usize = 5;
pub const MAX_RESPONSE_BYTES: u64 = 512 * 1024;
pub const HASH_MAX: H256 = H256([0xFF; 32]);

// Ask as much as 128 block bodies and 192 block headers per request
// these magic numbers are not part of the protocol and are taken from geth, see:
// https://github.com/ethereum/go-ethereum/blob/master/eth/downloader/downloader.go#L42
//
// Note: We noticed that while bigger values are supported
// increasing them may be the cause of peers disconnection
pub const MAX_BLOCK_BODIES_TO_REQUEST: usize = 128;
pub const MAX_BLOCK_HEADERS_TO_REQUEST: usize = 192;

/// An abstraction over the [KademliaTable] containing logic to make requests to peers
#[derive(Debug, Clone)]
pub struct PeerHandler {
Expand Down Expand Up @@ -76,20 +83,21 @@ impl PeerHandler {
&self,
start: H256,
order: BlockRequestOrder,
limit: u64,
) -> Option<Vec<BlockHeader>> {
for _ in 0..REQUEST_RETRY_ATTEMPTS {
let request_id = rand::random();
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
id: request_id,
startblock: start.into(),
limit: BLOCK_HEADER_LIMIT,
limit,
skip: 0,
reverse: matches!(order, BlockRequestOrder::NewToOld),
});
let peer = self.get_peer_channel_with_retry(Capability::Eth).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some(block_headers) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some(block_headers) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::BlockHeaders(BlockHeaders { id, block_headers }))
Expand Down Expand Up @@ -129,7 +137,7 @@ impl PeerHandler {
let peer = self.get_peer_channel_with_retry(Capability::Eth).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some(block_bodies) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some(block_bodies) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::BlockBodies(BlockBodies { id, block_bodies }))
Expand Down Expand Up @@ -171,7 +179,7 @@ impl PeerHandler {
let peer = self.get_peer_channel_with_retry(Capability::Eth).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some(receipts) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some(receipts) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::Receipts(Receipts { id, receipts }))
Expand Down Expand Up @@ -222,7 +230,7 @@ impl PeerHandler {
let peer = self.get_peer_channel_with_retry(Capability::Snap).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some((accounts, proof)) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some((accounts, proof)) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::AccountRange(AccountRange {
Expand Down Expand Up @@ -280,7 +288,7 @@ impl PeerHandler {
let peer = self.get_peer_channel_with_retry(Capability::Snap).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some(codes) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some(codes) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::ByteCodes(ByteCodes { id, codes }))
Expand Down Expand Up @@ -332,7 +340,7 @@ impl PeerHandler {
let peer = self.get_peer_channel_with_retry(Capability::Snap).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some((mut slots, proof)) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some((mut slots, proof)) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::StorageRanges(StorageRanges { id, slots, proof }))
Expand Down Expand Up @@ -428,7 +436,7 @@ impl PeerHandler {
let peer = self.get_peer_channel_with_retry(Capability::Snap).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some(nodes) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some(nodes) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes }))
Expand Down Expand Up @@ -497,7 +505,7 @@ impl PeerHandler {
let peer = self.get_peer_channel_with_retry(Capability::Snap).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some(nodes) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some(nodes) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::TrieNodes(TrieNodes { id, nodes }))
Expand Down Expand Up @@ -559,7 +567,7 @@ impl PeerHandler {
let peer = self.get_peer_channel_with_retry(Capability::Snap).await?;
let mut receiver = peer.receiver.lock().await;
peer.sender.send(request).await.ok()?;
if let Some((mut slots, proof)) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move {
if let Some((mut slots, proof)) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::StorageRanges(StorageRanges { id, slots, proof }))
Expand Down
Loading