From 3b8258ef4f90ec2f89a7a45179ad2195d8c77d42 Mon Sep 17 00:00:00 2001 From: Noah Citron Date: Tue, 8 Oct 2024 14:40:40 -0400 Subject: [PATCH 1/8] wip --- Cargo.lock | 2 ++ core/src/types.rs | 36 ++++++++++++++++++++++++++++++++++-- opstack/Cargo.toml | 2 ++ opstack/src/consensus.rs | 17 +++++++++++++---- opstack/src/types.rs | 4 +++- 5 files changed, 54 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbb7255d..4960b28b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3137,6 +3137,7 @@ name = "helios-opstack" version = "0.7.0" dependencies = [ "alloy", + "alloy-rlp", "axum", "clap", "discv5", @@ -3162,6 +3163,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "triehash-ethereum", "typenum", "unsigned-varint", "url", diff --git a/core/src/types.rs b/core/src/types.rs index e219bf1b..f88de57f 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -1,7 +1,8 @@ use std::fmt::Display; +use alloy::consensus::{Header, EMPTY_OMMER_ROOT_HASH}; use alloy::network::TransactionResponse; -use alloy::primitives::{Address, Bytes, B256, U256, U64}; +use alloy::primitives::{Address, Bloom, Bytes, B256, U256, U64, FixedBytes}; use serde::{de::Error, ser::SerializeSeq, Deserialize, Serialize}; #[derive(Deserialize, Serialize, Debug, Clone, Default)] @@ -17,7 +18,7 @@ pub struct Block { pub logs_bloom: Bytes, pub miner: Address, pub mix_hash: B256, - pub nonce: String, + pub nonce: FixedBytes<8>, pub parent_hash: B256, pub receipts_root: B256, pub sha3_uncles: B256, @@ -28,10 +29,41 @@ pub struct Block { pub transactions: Transactions, pub transactions_root: B256, pub uncles: Vec, + pub withdrawals_root: B256, pub blob_gas_used: Option, pub excess_blob_gas: Option, } +impl Block { + pub fn is_hash_valid(&self) -> bool { + let header = Header { + parent_hash: self.parent_hash, + ommers_hash: EMPTY_OMMER_ROOT_HASH, + beneficiary: self.miner, + state_root: self.state_root, + transactions_root: self.transactions_root, + receipts_root: self.receipts_root, + withdrawals_root: Some(self.withdrawals_root), + logs_bloom: Bloom::from_slice(&self.logs_bloom), + difficulty: self.difficulty, + number: self.number.to(), + gas_limit: self.gas_limit.to(), + gas_used: self.gas_used.to(), + timestamp: self.timestamp.to(), + mix_hash: self.mix_hash, + nonce: self.nonce, + base_fee_per_gas: Some(self.base_fee_per_gas.to()), + blob_gas_used: self.blob_gas_used.map(|v| v.to()), + excess_blob_gas: self.excess_blob_gas.map(|v| v.to()), + parent_beacon_block_root: None, + requests_root: None, + extra_data: self.extra_data.clone(), + }; + + header.hash_slow() == self.hash + } +} + #[derive(Deserialize, Debug, Clone)] pub enum Transactions { Hashes(Vec), diff --git a/opstack/Cargo.toml b/opstack/Cargo.toml index 0930257e..85702587 100644 --- a/opstack/Cargo.toml +++ b/opstack/Cargo.toml @@ -26,6 +26,8 @@ sha2.workspace = true ethereum_ssz_derive.workspace = true ethereum_ssz.workspace = true ssz_types.workspace = true +triehash-ethereum.workspace = true +alloy-rlp = "0.3.0" op-alloy-network = { git = "https://github.com/alloy-rs/op-alloy", tag = "v0.1.5" } op-alloy-consensus = { git = "https://github.com/alloy-rs/op-alloy", tag = "v0.1.5" } op-alloy-rpc-types = { git = "https://github.com/alloy-rs/op-alloy", tag = "v0.1.5" } diff --git a/opstack/src/consensus.rs b/opstack/src/consensus.rs index 972191d8..6c2be6df 100644 --- a/opstack/src/consensus.rs +++ b/opstack/src/consensus.rs @@ -1,9 +1,10 @@ use std::time::Duration; use alloy::consensus::Transaction as TxTrait; -use alloy::primitives::{b256, keccak256, Address, B256, U256, U64}; +use alloy::primitives::{b256, fixed_bytes, keccak256, Address, B256, U256, U64}; use alloy::rlp::Decodable; use alloy::rpc::types::{Parity, Signature, Transaction}; +use alloy_rlp::encode; use eyre::Result; use op_alloy_consensus::OpTxEnvelope; use tokio::sync::mpsc::Sender; @@ -11,6 +12,7 @@ use tokio::sync::{ mpsc::{channel, Receiver}, watch, }; +use triehash_ethereum::ordered_trie_root; use zduny_wasm_timer::{Delay, SystemTime, UNIX_EPOCH}; use helios_core::consensus::Consensus; @@ -131,7 +133,7 @@ impl Inner { } fn payload_to_block(value: ExecutionPayload) -> Result> { - let empty_nonce = "0x0000000000000000".to_string(); + let empty_nonce = fixed_bytes!("0000000000000000"); let empty_uncle_hash = b256!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"); @@ -282,6 +284,12 @@ fn payload_to_block(value: ExecutionPayload) -> Result> { }) .collect::>>()?; + let txs = Transactions::Full(txs); + let txs_root = ordered_trie_root(txs.hashes()); + + let withdrawals = value.withdrawals.iter().map(|v| encode(v)); + let withdrawals_root = ordered_trie_root(withdrawals); + Ok(Block { number: U64::from(value.block_number), base_fee_per_gas: value.base_fee_per_gas, @@ -297,12 +305,13 @@ fn payload_to_block(value: ExecutionPayload) -> Result> { state_root: value.state_root, timestamp: U64::from(value.timestamp), total_difficulty: U64::ZERO, - transactions: Transactions::Full(txs), + transactions: txs, mix_hash: value.prev_randao, nonce: empty_nonce, sha3_uncles: empty_uncle_hash, size: U64::ZERO, - transactions_root: B256::default(), + transactions_root: B256::from_slice(txs_root.as_bytes()), + withdrawals_root: B256::from_slice(withdrawals_root.as_bytes()), uncles: vec![], blob_gas_used: Some(U64::from(value.blob_gas_used)), excess_blob_gas: Some(U64::from(value.excess_blob_gas)), diff --git a/opstack/src/types.rs b/opstack/src/types.rs index be8c0d79..896ce4da 100644 --- a/opstack/src/types.rs +++ b/opstack/src/types.rs @@ -1,4 +1,5 @@ use alloy::primitives::{Address, B256, U256}; +use alloy_rlp::RlpEncodable; use ssz_derive::{Decode, Encode}; use ssz_types::{FixedVector, VariableList}; @@ -27,10 +28,11 @@ pub type Transaction = VariableList; pub type LogsBloom = FixedVector; pub type ExtraData = VariableList; -#[derive(Clone, Debug, Encode, Decode)] +#[derive(Clone, Debug, Encode, Decode, RlpEncodable)] pub struct Withdrawal { index: u64, validator_index: u64, address: Address, amount: u64, } + From d5788aead1e7b221b4cc9f08504eb4b26f6fdecd Mon Sep 17 00:00:00 2001 From: Noah Citron Date: Wed, 9 Oct 2024 17:57:37 -0400 Subject: [PATCH 2/8] feat: add execution block backfill --- Cargo.lock | 2 + core/src/client/node.rs | 2 +- core/src/execution/mod.rs | 4 +- core/src/execution/rpc/http_rpc.rs | 14 ++- core/src/execution/rpc/mock_rpc.rs | 6 +- core/src/execution/rpc/mod.rs | 3 +- core/src/execution/state.rs | 117 ++++++++++++++++++++--- core/src/types.rs | 17 +++- ethereum/Cargo.toml | 1 + ethereum/consensus-core/Cargo.toml | 1 + ethereum/consensus-core/src/types/mod.rs | 3 +- ethereum/src/consensus.rs | 17 +++- opstack/src/consensus.rs | 7 +- opstack/src/types.rs | 1 - 14 files changed, 161 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4960b28b..7aca684b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3057,6 +3057,7 @@ name = "helios-consensus-core" version = "0.7.0" dependencies = [ "alloy", + "alloy-rlp", "bls12_381", "ethereum_ssz 0.6.0", "ethereum_ssz_derive 0.6.0", @@ -3128,6 +3129,7 @@ dependencies = [ "tokio", "tracing", "tree_hash", + "triehash-ethereum", "wasm-bindgen-futures", "zduny-wasm-timer", ] diff --git a/core/src/client/node.rs b/core/src/client/node.rs index 2c17287c..a1bf7123 100644 --- a/core/src/client/node.rs +++ b/core/src/client/node.rs @@ -25,7 +25,7 @@ impl> Node { let block_recv = consensus.block_recv().take().unwrap(); let finalized_block_recv = consensus.finalized_block_recv().take().unwrap(); - let state = State::new(block_recv, finalized_block_recv, 256); + let state = State::new(block_recv, finalized_block_recv, 256, execution_rpc); let execution = Arc::new( ExecutionClient::new(execution_rpc, state).map_err(ClientError::InternalError)?, ); diff --git a/core/src/execution/mod.rs b/core/src/execution/mod.rs index f0f66c45..fcd26ebb 100644 --- a/core/src/execution/mod.rs +++ b/core/src/execution/mod.rs @@ -31,11 +31,11 @@ mod proof; #[derive(Clone)] pub struct ExecutionClient> { pub rpc: R, - state: State, + state: State, } impl> ExecutionClient { - pub fn new(rpc: &str, state: State) -> Result { + pub fn new(rpc: &str, state: State) -> Result { let rpc: R = ExecutionRpc::new(rpc)?; Ok(ExecutionClient:: { rpc, state }) } diff --git a/core/src/execution/rpc/http_rpc.rs b/core/src/execution/rpc/http_rpc.rs index ecd04dad..4eb52b96 100644 --- a/core/src/execution/rpc/http_rpc.rs +++ b/core/src/execution/rpc/http_rpc.rs @@ -5,13 +5,13 @@ use alloy::rpc::types::{BlockId, EIP1186AccountProofResponse, FeeHistory, Filter use alloy::transports::http::Http; use alloy::transports::layers::{RetryBackoffLayer, RetryBackoffService}; use async_trait::async_trait; -use eyre::Result; +use eyre::{eyre, Result}; use reqwest::Client; use revm::primitives::AccessList; use crate::errors::RpcError; use crate::network_spec::NetworkSpec; -use crate::types::BlockTag; +use crate::types::{Block, BlockTag}; use super::ExecutionRpc; @@ -190,4 +190,14 @@ impl ExecutionRpc for HttpRpc { .await .map_err(|e| RpcError::new("fee_history", e))?) } + + async fn get_block(&self, hash: B256) -> Result> { + self.provider + .raw_request::<_, Option>>( + "eth_getBlockByHash".into(), + (hash, true), + ) + .await? + .ok_or(eyre!("block not found")) + } } diff --git a/core/src/execution/rpc/mock_rpc.rs b/core/src/execution/rpc/mock_rpc.rs index fae60b6f..17c977bc 100644 --- a/core/src/execution/rpc/mock_rpc.rs +++ b/core/src/execution/rpc/mock_rpc.rs @@ -7,7 +7,7 @@ use eyre::{eyre, Result}; use super::ExecutionRpc; use crate::network_spec::NetworkSpec; -use crate::types::BlockTag; +use crate::types::{Block, BlockTag}; #[derive(Clone)] pub struct MockRpc { @@ -89,6 +89,10 @@ impl ExecutionRpc for MockRpc { Err(eyre!("not implemented")) } + async fn get_block(&self, _hash: B256) -> Result> { + Err(eyre!("not implemented")) + } + async fn get_fee_history( &self, _block_count: u64, diff --git a/core/src/execution/rpc/mod.rs b/core/src/execution/rpc/mod.rs index 686b68a4..56706dc4 100644 --- a/core/src/execution/rpc/mod.rs +++ b/core/src/execution/rpc/mod.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use eyre::Result; use crate::network_spec::NetworkSpec; -use crate::types::BlockTag; +use crate::types::{Block, BlockTag}; pub mod http_rpc; pub mod mock_rpc; @@ -40,6 +40,7 @@ pub trait ExecutionRpc: Send + Clone + Sync + 'static { async fn get_new_block_filter(&self) -> Result; async fn get_new_pending_transaction_filter(&self) -> Result; async fn chain_id(&self) -> Result; + async fn get_block(&self, hash: B256) -> Result>; async fn get_fee_history( &self, diff --git a/core/src/execution/state.rs b/core/src/execution/state.rs index 3efdffd5..67c28c8e 100644 --- a/core/src/execution/state.rs +++ b/core/src/execution/state.rs @@ -4,26 +4,32 @@ use std::{ }; use alloy::primitives::{Address, B256, U256}; +use eyre::{eyre, Result}; use tokio::{ select, sync::{mpsc::Receiver, watch, RwLock}, }; +use tracing::{info, warn}; use crate::network_spec::NetworkSpec; use crate::types::{Block, BlockTag, Transactions}; +use super::rpc::ExecutionRpc; + #[derive(Clone)] -pub struct State { - inner: Arc>>, +pub struct State> { + inner: Arc>>, } -impl State { +impl> State { pub fn new( mut block_recv: Receiver>, mut finalized_block_recv: watch::Receiver>>, history_length: u64, + rpc: &str, ) -> Self { - let inner = Arc::new(RwLock::new(Inner::new(history_length))); + let rpc = R::new(rpc).unwrap(); + let inner = Arc::new(RwLock::new(Inner::new(history_length, rpc))); let inner_ref = inner.clone(); #[cfg(not(target_arch = "wasm32"))] @@ -36,13 +42,13 @@ impl State { select! { block = block_recv.recv() => { if let Some(block) = block { - inner_ref.write().await.push_block(block); + inner_ref.write().await.push_block(block).await; } }, _ = finalized_block_recv.changed() => { let block = finalized_block_recv.borrow_and_update().clone(); if let Some(block) = block { - inner_ref.write().await.push_finalized_block(block); + inner_ref.write().await.push_finalized_block(block).await; } }, @@ -54,7 +60,7 @@ impl State { } pub async fn push_block(&self, block: Block) { - self.inner.write().await.push_block(block); + self.inner.write().await.push_block(block).await; } // full block fetch @@ -152,27 +158,65 @@ impl State { } } -#[derive(Default)] -struct Inner { +struct Inner> { blocks: BTreeMap>, finalized_block: Option>, hashes: HashMap, txs: HashMap, history_length: u64, + rpc: R, } -impl Inner { - pub fn new(history_length: u64) -> Self { +impl> Inner { + pub fn new(history_length: u64, rpc: R) -> Self { Self { history_length, blocks: BTreeMap::default(), finalized_block: None, hashes: HashMap::default(), txs: HashMap::default(), + rpc, } } - pub fn push_block(&mut self, block: Block) { + pub async fn push_block(&mut self, block: Block) { + let block_number = block.number.to::(); + if self.try_insert_tip(block) { + let mut n = block_number; + + loop { + if let Ok(backfilled) = self.backfill_behind(n).await { + if !backfilled { + break; + } + n -= 1; + } else { + self.prune_before(n); + break; + } + } + + let link_child = self.blocks.get(&n); + let link_parent = self.blocks.get(&(n - 1)); + + if let (Some(parent), Some(child)) = (link_parent, link_child) { + if child.parent_hash != parent.hash { + warn!("detected block reorganization"); + self.prune_before(n); + } + } + + self.prune(); + } + } + + fn try_insert_tip(&mut self, block: Block) -> bool { + if let Some((num, _)) = self.blocks.last_key_value() { + if num > &block.number.to() { + return false; + } + } + self.hashes.insert(block.hash, block.number.to()); block .transactions @@ -188,7 +232,10 @@ impl Inner { }); self.blocks.insert(block.number.to(), block); + true + } + fn prune(&mut self) { while self.blocks.len() as u64 > self.history_length { if let Some((number, _)) = self.blocks.first_key_value() { self.remove_block(*number); @@ -196,16 +243,56 @@ impl Inner { } } - pub fn push_finalized_block(&mut self, block: Block) { + fn prune_before(&mut self, n: u64) { + loop { + if let Some((oldest, _)) = self.blocks.first_key_value() { + let oldest = *oldest; + if oldest < n { + self.blocks.remove(&oldest); + } else { + break; + } + } else { + break; + } + } + } + + async fn backfill_behind(&mut self, n: u64) -> Result { + if self.blocks.len() < 2 { + return Ok(false); + } + + if let Some(block) = self.blocks.get(&n) { + let prev = n - 1; + if self.blocks.get(&prev).is_none() { + let backfilled = self.rpc.get_block(block.parent_hash).await?; + if backfilled.is_hash_valid() && block.parent_hash == backfilled.hash { + info!("backfilled: block={}", backfilled.number); + self.blocks.insert(backfilled.number.to(), backfilled); + Ok(true) + } else { + warn!("bad block backfill"); + Err(eyre!("bad backfill")) + } + } else { + Ok(false) + } + } else { + Ok(false) + } + } + + pub async fn push_finalized_block(&mut self, block: Block) { self.finalized_block = Some(block.clone()); if let Some(old_block) = self.blocks.get(&block.number.to()) { if old_block.hash != block.hash { self.remove_block(old_block.number.to()); - self.push_block(block) + self.push_block(block).await; } } else { - self.push_block(block); + self.push_block(block).await; } } diff --git a/core/src/types.rs b/core/src/types.rs index f88de57f..b15390c2 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use alloy::consensus::{Header, EMPTY_OMMER_ROOT_HASH}; use alloy::network::TransactionResponse; -use alloy::primitives::{Address, Bloom, Bytes, B256, U256, U64, FixedBytes}; +use alloy::primitives::{Address, Bloom, Bytes, FixedBytes, B256, U256, U64}; use serde::{de::Error, ser::SerializeSeq, Deserialize, Serialize}; #[derive(Deserialize, Serialize, Debug, Clone, Default)] @@ -32,6 +32,7 @@ pub struct Block { pub withdrawals_root: B256, pub blob_gas_used: Option, pub excess_blob_gas: Option, + pub parent_beacon_block_root: Option, } impl Block { @@ -55,7 +56,7 @@ impl Block { base_fee_per_gas: Some(self.base_fee_per_gas.to()), blob_gas_used: self.blob_gas_used.map(|v| v.to()), excess_blob_gas: self.excess_blob_gas.map(|v| v.to()), - parent_beacon_block_root: None, + parent_beacon_block_root: self.parent_beacon_block_root, requests_root: None, extra_data: self.extra_data.clone(), }; @@ -64,7 +65,7 @@ impl Block { } } -#[derive(Deserialize, Debug, Clone)] +#[derive(Debug, Clone)] pub enum Transactions { Hashes(Vec), Full(Vec), @@ -111,6 +112,16 @@ impl Serialize for Transactions { } } +impl<'de, T: TransactionResponse + Deserialize<'de>> Deserialize<'de> for Transactions { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let txs: Vec = serde::Deserialize::deserialize(deserializer)?; + Ok(Transactions::Full(txs)) + } +} + #[derive(Debug, Clone, Copy)] pub enum BlockTag { Latest, diff --git a/ethereum/Cargo.toml b/ethereum/Cargo.toml index d111f773..6c232101 100644 --- a/ethereum/Cargo.toml +++ b/ethereum/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" # consensus tree_hash.workspace = true revm.workspace = true +triehash-ethereum.workspace = true # config figment = { version = "0.10.7", features = ["toml", "env"] } diff --git a/ethereum/consensus-core/Cargo.toml b/ethereum/consensus-core/Cargo.toml index aaa24f22..ab448a9c 100644 --- a/ethereum/consensus-core/Cargo.toml +++ b/ethereum/consensus-core/Cargo.toml @@ -11,6 +11,7 @@ alloy = { version = "0.2.1", features = [ "rlp", "k256", ] } +alloy-rlp = "0.3.0" bls12_381.workspace = true ssz_types.workspace = true ethereum_ssz_derive.workspace = true diff --git a/ethereum/consensus-core/src/types/mod.rs b/ethereum/consensus-core/src/types/mod.rs index da29dbf8..f11d3903 100644 --- a/ethereum/consensus-core/src/types/mod.rs +++ b/ethereum/consensus-core/src/types/mod.rs @@ -1,4 +1,5 @@ use alloy::primitives::{Address, FixedBytes, B256, U256}; +use alloy_rlp::RlpEncodable; use eyre::Result; use serde::{Deserialize, Serialize}; use ssz_derive::Encode; @@ -182,7 +183,7 @@ impl Default for ExecutionPayloadHeader { } } -#[derive(Default, Clone, Debug, Encode, TreeHash, Deserialize)] +#[derive(Default, Clone, Debug, Encode, TreeHash, Deserialize, RlpEncodable)] pub struct Withdrawal { #[serde(with = "serde_utils::u64")] index: u64, diff --git a/ethereum/src/consensus.rs b/ethereum/src/consensus.rs index aeb473c9..a880a1a4 100644 --- a/ethereum/src/consensus.rs +++ b/ethereum/src/consensus.rs @@ -3,8 +3,8 @@ use std::process; use std::sync::Arc; use alloy::consensus::{Transaction as TxTrait, TxEnvelope}; -use alloy::primitives::{b256, B256, U256, U64}; -use alloy::rlp::Decodable; +use alloy::primitives::{b256, fixed_bytes, B256, U256, U64}; +use alloy::rlp::{encode, Decodable}; use alloy::rpc::types::{Parity, Signature, Transaction}; use chrono::Duration; use eyre::eyre; @@ -12,6 +12,7 @@ use eyre::Result; use futures::future::join_all; use tracing::{debug, error, info, warn}; use tree_hash::TreeHash; +use triehash_ethereum::ordered_trie_root; use zduny_wasm_timer::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc::channel; @@ -521,7 +522,7 @@ impl Inner { } fn payload_to_block(value: ExecutionPayload) -> Block { - let empty_nonce = "0x0000000000000000".to_string(); + let empty_nonce = fixed_bytes!("0000000000000000"); let empty_uncle_hash = b256!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"); @@ -620,6 +621,12 @@ fn payload_to_block(value: ExecutionPayload) -> Block { }) .collect::>(); + let raw_txs = value.transactions().iter().map(|tx| tx.inner.to_vec()); + let txs_root = ordered_trie_root(raw_txs); + + let withdrawals = value.withdrawals().unwrap().iter().map(encode); + let withdrawals_root = ordered_trie_root(withdrawals); + Block { number: U64::from(*value.block_number()), base_fee_per_gas: *value.base_fee_per_gas(), @@ -640,10 +647,12 @@ fn payload_to_block(value: ExecutionPayload) -> Block { nonce: empty_nonce, sha3_uncles: empty_uncle_hash, size: U64::ZERO, - transactions_root: B256::default(), + transactions_root: B256::from_slice(txs_root.as_bytes()), uncles: vec![], blob_gas_used: value.blob_gas_used().map(|v| U64::from(*v)).ok(), excess_blob_gas: value.excess_blob_gas().map(|v| U64::from(*v)).ok(), + withdrawals_root: B256::from_slice(withdrawals_root.as_bytes()), + parent_beacon_block_root: Some(*value.parent_hash()), } } diff --git a/opstack/src/consensus.rs b/opstack/src/consensus.rs index 6c2be6df..983360c1 100644 --- a/opstack/src/consensus.rs +++ b/opstack/src/consensus.rs @@ -284,8 +284,8 @@ fn payload_to_block(value: ExecutionPayload) -> Result> { }) .collect::>>()?; - let txs = Transactions::Full(txs); - let txs_root = ordered_trie_root(txs.hashes()); + let raw_txs = value.transactions.iter().map(|tx| tx.to_vec()); + let txs_root = ordered_trie_root(raw_txs); let withdrawals = value.withdrawals.iter().map(|v| encode(v)); let withdrawals_root = ordered_trie_root(withdrawals); @@ -305,7 +305,7 @@ fn payload_to_block(value: ExecutionPayload) -> Result> { state_root: value.state_root, timestamp: U64::from(value.timestamp), total_difficulty: U64::ZERO, - transactions: txs, + transactions: Transactions::Full(txs), mix_hash: value.prev_randao, nonce: empty_nonce, sha3_uncles: empty_uncle_hash, @@ -315,5 +315,6 @@ fn payload_to_block(value: ExecutionPayload) -> Result> { uncles: vec![], blob_gas_used: Some(U64::from(value.blob_gas_used)), excess_blob_gas: Some(U64::from(value.excess_blob_gas)), + parent_beacon_block_root: None, }) } diff --git a/opstack/src/types.rs b/opstack/src/types.rs index 896ce4da..a059a8fb 100644 --- a/opstack/src/types.rs +++ b/opstack/src/types.rs @@ -35,4 +35,3 @@ pub struct Withdrawal { address: Address, amount: u64, } - From 8a4227506ad1ab2ed65279f2147e6d9e29c9b27d Mon Sep 17 00:00:00 2001 From: Noah Citron Date: Wed, 9 Oct 2024 19:03:08 -0400 Subject: [PATCH 3/8] handle finalized blocks --- core/src/execution/state.rs | 10 ++++------ ethereum/src/consensus.rs | 40 ++++++++++--------------------------- 2 files changed, 15 insertions(+), 35 deletions(-) diff --git a/core/src/execution/state.rs b/core/src/execution/state.rs index 67c28c8e..3b89afa3 100644 --- a/core/src/execution/state.rs +++ b/core/src/execution/state.rs @@ -284,16 +284,14 @@ impl> Inner { } pub async fn push_finalized_block(&mut self, block: Block) { - self.finalized_block = Some(block.clone()); - if let Some(old_block) = self.blocks.get(&block.number.to()) { if old_block.hash != block.hash { - self.remove_block(old_block.number.to()); - self.push_block(block).await; + self.blocks = BTreeMap::new(); } - } else { - self.push_block(block).await; } + + self.finalized_block = Some(block.clone()); + self.push_block(block).await; } fn remove_block(&mut self, number: u64) { diff --git a/ethereum/src/consensus.rs b/ethereum/src/consensus.rs index a880a1a4..d56fec17 100644 --- a/ethereum/src/consensus.rs +++ b/ethereum/src/consensus.rs @@ -21,8 +21,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::watch; use helios_consensus_core::{ - apply_bootstrap, apply_finality_update, apply_optimistic_update, apply_update, - calc_sync_period, + apply_bootstrap, apply_finality_update, apply_update, calc_sync_period, errors::ConsensusError, expected_current_slot, get_bits, types::{ExecutionPayload, FinalityUpdate, LightClientStore, OptimisticUpdate, Update}, @@ -305,10 +304,6 @@ impl Inner { self.verify_finality_update(&finality_update)?; self.apply_finality_update(&finality_update); - let optimistic_update = self.rpc.get_optimistic_update().await?; - self.verify_optimistic_update(&optimistic_update)?; - self.apply_optimistic_update(&optimistic_update); - info!( target: "helios::consensus", "consensus client in sync with checkpoint: 0x{}", @@ -323,10 +318,6 @@ impl Inner { self.verify_finality_update(&finality_update)?; self.apply_finality_update(&finality_update); - let optimistic_update = self.rpc.get_optimistic_update().await?; - self.verify_optimistic_update(&optimistic_update)?; - self.apply_optimistic_update(&optimistic_update); - if self.store.next_sync_committee.is_none() { debug!(target: "helios::consensus", "checking for sync committee update"); let current_period = calc_sync_period(self.store.finalized_header.beacon.slot); @@ -421,16 +412,6 @@ impl Inner { ) } - fn verify_optimistic_update(&self, update: &OptimisticUpdate) -> Result<()> { - verify_optimistic_update( - update, - self.expected_current_slot(), - &self.store, - self.config.chain.genesis_root, - &self.config.forks, - ) - } - pub fn apply_update(&mut self, update: &Update) { let new_checkpoint = apply_update(&mut self.store, update); if new_checkpoint.is_some() { @@ -439,19 +420,20 @@ impl Inner { } fn apply_finality_update(&mut self, update: &FinalityUpdate) { + let prev_finalized_slot = self.store.finalized_header.slot; + let prev_optimistic_slot = self.store.optimistic_header.slot; let new_checkpoint = apply_finality_update(&mut self.store, update); + let new_finalized_slot = self.store.finalized_header.slot; + let new_optimistic_slot = self.store.optimistic_header.slot; if new_checkpoint.is_some() { self.last_checkpoint = new_checkpoint; } - self.log_finality_update(update); - } - - fn apply_optimistic_update(&mut self, update: &OptimisticUpdate) { - let new_checkpoint = apply_optimistic_update(&mut self.store, update); - if new_checkpoint.is_some() { - self.last_checkpoint = new_checkpoint; + if new_finalized_slot != prev_finalized_slot { + self.log_finality_update(update); + } + if new_optimistic_slot != prev_optimistic_slot { + self.log_optimistic_update(update) } - self.log_optimistic_update(update); } fn log_finality_update(&self, update: &FinalityUpdate) { @@ -472,7 +454,7 @@ impl Inner { ); } - fn log_optimistic_update(&self, update: &OptimisticUpdate) { + fn log_optimistic_update(&self, update: &FinalityUpdate) { let participation = get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512f32 * 100f32; let decimals = if participation == 100.0 { 1 } else { 2 }; From 66cce2d6fc74aa69c8a0dadfbc3ef9f4ca20cf6c Mon Sep 17 00:00:00 2001 From: Noah Citron Date: Wed, 9 Oct 2024 19:04:34 -0400 Subject: [PATCH 4/8] cleanup --- ethereum/src/consensus.rs | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/ethereum/src/consensus.rs b/ethereum/src/consensus.rs index d56fec17..84beabd2 100644 --- a/ethereum/src/consensus.rs +++ b/ethereum/src/consensus.rs @@ -24,8 +24,8 @@ use helios_consensus_core::{ apply_bootstrap, apply_finality_update, apply_update, calc_sync_period, errors::ConsensusError, expected_current_slot, get_bits, - types::{ExecutionPayload, FinalityUpdate, LightClientStore, OptimisticUpdate, Update}, - verify_bootstrap, verify_finality_update, verify_optimistic_update, verify_update, + types::{ExecutionPayload, FinalityUpdate, LightClientStore, Update}, + verify_bootstrap, verify_finality_update, verify_update, }; use helios_core::consensus::Consensus; use helios_core::types::{Block, Transactions}; @@ -808,28 +808,6 @@ mod tests { ); } - #[tokio::test] - async fn test_verify_optimistic() { - let client = get_client(false, true).await; - - let update = client.rpc.get_optimistic_update().await.unwrap(); - client.verify_optimistic_update(&update).unwrap(); - } - - #[tokio::test] - async fn test_verify_optimistic_invalid_sig() { - let client = get_client(false, true).await; - - let mut update = client.rpc.get_optimistic_update().await.unwrap(); - update.sync_aggregate.sync_committee_signature = Signature::default(); - - let err = client.verify_optimistic_update(&update).err().unwrap(); - assert_eq!( - err.to_string(), - ConsensusError::InvalidSignature.to_string() - ); - } - #[tokio::test] #[should_panic] async fn test_verify_checkpoint_age_invalid() { From 14ad4169a6e832e7e168955c09b331cd5cbe2925 Mon Sep 17 00:00:00 2001 From: Noah Citron Date: Thu, 10 Oct 2024 12:25:11 -0400 Subject: [PATCH 5/8] fix timing in wasm --- Cargo.lock | 20 ++++++- core/Cargo.toml | 6 +- core/src/client/mod.rs | 6 +- core/src/client/node.rs | 4 +- core/src/lib.rs | 1 + core/src/time.rs | 9 +++ ethereum/Cargo.toml | 1 - ethereum/consensus-core/Cargo.toml | 3 +- ethereum/consensus-core/src/consensus_core.rs | 15 +++-- ethereum/src/consensus.rs | 16 +++-- helios-ts/index.html | 58 +++++++++++-------- helios-ts/lib.ts | 8 +++ opstack/Cargo.toml | 2 - opstack/src/consensus.rs | 10 +++- 14 files changed, 108 insertions(+), 51 deletions(-) create mode 100644 core/src/time.rs diff --git a/Cargo.lock b/Cargo.lock index 7aca684b..ade7212c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3072,6 +3072,7 @@ dependencies = [ "tree_hash", "tree_hash_derive", "typenum", + "wasmtimer", "zduny-wasm-timer", ] @@ -3097,7 +3098,8 @@ dependencies = [ "tracing", "triehash-ethereum", "wasm-bindgen-futures", - "zduny-wasm-timer", + "wasmtimer", + "web-sys", ] [[package]] @@ -3131,7 +3133,6 @@ dependencies = [ "tree_hash", "triehash-ethereum", "wasm-bindgen-futures", - "zduny-wasm-timer", ] [[package]] @@ -3170,7 +3171,6 @@ dependencies = [ "unsigned-varint", "url", "wasm-bindgen-futures", - "zduny-wasm-timer", ] [[package]] @@ -7685,6 +7685,20 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmtimer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f656cd8858a5164932d8a90f936700860976ec21eb00e0fe2aa8cab13f6b4cf" +dependencies = [ + "futures", + "js-sys", + "parking_lot 0.12.3", + "pin-utils", + "slab", + "wasm-bindgen", +] + [[package]] name = "web-sys" version = "0.3.72" diff --git a/core/Cargo.toml b/core/Cargo.toml index dfaf0c80..2a420265 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,7 +24,6 @@ eyre.workspace = true hex.workspace = true tracing.workspace = true thiserror.workspace = true -zduny-wasm-timer.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] jsonrpsee = { version = "0.19.0", features = ["full"] } @@ -33,6 +32,11 @@ openssl.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4.33" gloo-timers = "0.3.0" +wasmtimer = "0.2.0" [target.wasm32-unknown-unknown.dependencies] parking_lot = { version = "0.12.2" } + +[dependencies.web-sys] +version = "0.3" +features = ["console", "Storage", "Window"] diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index c74441a3..6d1a31f8 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -6,13 +6,13 @@ use alloy::primitives::{Address, Bytes, B256, U256}; use alloy::rpc::types::{Filter, Log, SyncStatus}; use eyre::Result; use tracing::{info, warn}; -use zduny_wasm_timer::Delay; use crate::client::node::Node; #[cfg(not(target_arch = "wasm32"))] use crate::client::rpc::Rpc; use crate::consensus::Consensus; use crate::network_spec::NetworkSpec; +use crate::time::interval; use crate::types::{Block, BlockTag}; pub mod node; @@ -192,12 +192,12 @@ impl> Client { } pub async fn wait_synced(&self) { + let mut interval = interval(Duration::from_millis(100)); loop { + interval.tick().await; if let Ok(SyncStatus::None) = self.syncing().await { break; } - - Delay::new(Duration::from_millis(100)).await.unwrap(); } } } diff --git a/core/src/client/node.rs b/core/src/client/node.rs index a1bf7123..5b82c199 100644 --- a/core/src/client/node.rs +++ b/core/src/client/node.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use alloy::primitives::{Address, Bytes, B256, U256}; use alloy::rpc::types::{Filter, Log, SyncInfo, SyncStatus}; use eyre::{eyre, Result}; -use zduny_wasm_timer::{SystemTime, UNIX_EPOCH}; use crate::consensus::Consensus; use crate::errors::ClientError; @@ -12,6 +11,7 @@ use crate::execution::rpc::http_rpc::HttpRpc; use crate::execution::state::State; use crate::execution::ExecutionClient; use crate::network_spec::NetworkSpec; +use crate::time::{SystemTime, UNIX_EPOCH}; use crate::types::{Block, BlockTag}; pub struct Node> { @@ -239,7 +239,7 @@ impl> Node { async fn check_head_age(&self) -> Result<(), ClientError> { let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_else(|_| panic!("unreachable")) .as_secs(); let block_timestamp = self diff --git a/core/src/lib.rs b/core/src/lib.rs index bcd61ab6..91378f0a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -2,6 +2,7 @@ pub mod client; pub mod consensus; pub mod errors; pub mod network_spec; +pub mod time; pub mod types; mod execution; diff --git a/core/src/time.rs b/core/src/time.rs new file mode 100644 index 00000000..59d0a99f --- /dev/null +++ b/core/src/time.rs @@ -0,0 +1,9 @@ +#[cfg(not(target_arch = "wasm32"))] +pub use std::time::{SystemTime, UNIX_EPOCH}; +#[cfg(not(target_arch = "wasm32"))] +pub use tokio::time::{interval, interval_at, Instant}; +#[cfg(target_arch = "wasm32")] +pub use wasmtimer::{ + std::{Instant, SystemTime, UNIX_EPOCH}, + tokio::{interval, interval_at}, +}; diff --git a/ethereum/Cargo.toml b/ethereum/Cargo.toml index 6c232101..6e464a3d 100644 --- a/ethereum/Cargo.toml +++ b/ethereum/Cargo.toml @@ -32,7 +32,6 @@ tracing.workspace = true chrono.workspace = true thiserror.workspace = true superstruct.workspace = true -zduny-wasm-timer.workspace = true retri.workspace = true helios-core = { path = "../core" } diff --git a/ethereum/consensus-core/Cargo.toml b/ethereum/consensus-core/Cargo.toml index ab448a9c..aec62064 100644 --- a/ethereum/consensus-core/Cargo.toml +++ b/ethereum/consensus-core/Cargo.toml @@ -28,6 +28,5 @@ tracing.workspace = true zduny-wasm-timer.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] -# Building consensus-core for wasm requires getrandom with the js feature. -# Source: https://github.com/alloy-rs/core?tab=readme-ov-file#wasm-support getrandom = { version = "0.2", features = ["js"] } +wasmtimer = "0.2.0" diff --git a/ethereum/consensus-core/src/consensus_core.rs b/ethereum/consensus-core/src/consensus_core.rs index ff2bc582..e5ec0056 100644 --- a/ethereum/consensus-core/src/consensus_core.rs +++ b/ethereum/consensus-core/src/consensus_core.rs @@ -1,11 +1,14 @@ use std::cmp; +#[cfg(not(target_arch = "wasm32"))] +use std::time::{SystemTime, UNIX_EPOCH}; use alloy::primitives::B256; use eyre::Result; use ssz_types::BitVector; use tracing::{info, warn}; use tree_hash::TreeHash; -use zduny_wasm_timer::{SystemTime, UNIX_EPOCH}; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::{SystemTime, UNIX_EPOCH}; use crate::errors::ConsensusError; use crate::proof::{ @@ -309,10 +312,14 @@ fn verify_generic_update( } pub fn expected_current_slot(now: SystemTime, genesis_time: u64) -> u64 { - let now = now.duration_since(UNIX_EPOCH).unwrap(); - let since_genesis = now - std::time::Duration::from_secs(genesis_time); + let now = now + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| panic!("unreachable")) + .as_secs(); - since_genesis.as_secs() / 12 + let since_genesis = now - genesis_time; + + since_genesis / 12 } pub fn calc_sync_period(slot: u64) -> u64 { diff --git a/ethereum/src/consensus.rs b/ethereum/src/consensus.rs index 84beabd2..71c3e151 100644 --- a/ethereum/src/consensus.rs +++ b/ethereum/src/consensus.rs @@ -13,7 +13,6 @@ use futures::future::join_all; use tracing::{debug, error, info, warn}; use tree_hash::TreeHash; use triehash_ethereum::ordered_trie_root; -use zduny_wasm_timer::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::Receiver; @@ -28,6 +27,7 @@ use helios_consensus_core::{ verify_bootstrap, verify_finality_update, verify_update, }; use helios_core::consensus::Consensus; +use helios_core::time::{interval_at, Instant, SystemTime, UNIX_EPOCH}; use helios_core::types::{Block, Transactions}; use crate::config::checkpoints::CheckpointFallback; @@ -136,10 +136,11 @@ impl ConsensusClient { _ = inner.send_blocks().await; + let start = Instant::now() + inner.duration_until_next_update().to_std().unwrap(); + let mut interval = interval_at(start, std::time::Duration::from_secs(12)); + loop { - zduny_wasm_timer::Delay::new(inner.duration_until_next_update().to_std().unwrap()) - .await - .unwrap(); + interval.tick().await; let res = inner.advance().await; if let Err(err) = res { @@ -360,7 +361,7 @@ impl Inner { let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_else(|_| panic!("unreachable")) .as_secs(); let time_to_next_slot = next_slot_timestamp - now; @@ -474,7 +475,10 @@ impl Inner { fn age(&self, slot: u64) -> Duration { let expected_time = self.slot_timestamp(slot); - let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| panic!("unreachable")); + let delay = now - std::time::Duration::from_secs(expected_time); chrono::Duration::from_std(delay).unwrap() } diff --git a/helios-ts/index.html b/helios-ts/index.html index c0eea8ad..ca02e17e 100644 --- a/helios-ts/index.html +++ b/helios-ts/index.html @@ -111,7 +111,7 @@

Base