Skip to content

Commit

Permalink
fix: ensure logs returned by RPC match filter (#507)
Browse files Browse the repository at this point in the history
* fix: ensure logs returned by RPC match filter

* replace custom proof impl and triehash with alloy-trie

* fix some clippy warnings

* address review comments
  • Loading branch information
eshaan7 authored Feb 10, 2025
1 parent 19b841a commit 32f47de
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 12 deletions.
2 changes: 2 additions & 0 deletions core/src/execution/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum ExecutionError {
BlockReceiptsRootMismatch(BlockTag),
#[error("filter not found: 0x{0:x}")]
FilterNotFound(U256),
#[error("log does not match filter")]
LogFilterMismatch(),
}

/// Errors that can occur during evm.rs calls
Expand Down
80 changes: 70 additions & 10 deletions core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
}

self.ensure_logs_match_filter(&logs, &filter).await?;
self.verify_logs(&logs).await?;
Ok(logs)
}
Expand All @@ -309,7 +309,7 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
// only concerned with filters created via helios
return Err(ExecutionError::FilterNotFound(filter_id).into());
}
Some(FilterType::Logs) => {
Some(FilterType::Logs(filter)) => {
// underlying RPC takes care of keeping track of changes
let filter_changes = self.rpc.get_filter_changes(filter_id).await?;
let logs = filter_changes.as_logs().unwrap_or(&[]);
Expand All @@ -320,6 +320,7 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
)
.into());
}
self.ensure_logs_match_filter(logs, filter).await?;
self.verify_logs(logs).await?;
FilterChanges::Logs(logs.to_vec())
}
Expand Down Expand Up @@ -351,14 +352,27 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
}

pub async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>> {
let logs = self.rpc.get_filter_logs(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
let filter_type = self.state.get_filter(&filter_id).await;

match &filter_type {
Some(FilterType::Logs(filter)) => {
let logs = self.rpc.get_filter_logs(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(ExecutionError::TooManyLogsToProve(
logs.len(),
MAX_SUPPORTED_LOGS_NUMBER,
)
.into());
}
self.ensure_logs_match_filter(&logs, filter).await?;
self.verify_logs(&logs).await?;
Ok(logs)
}
_ => {
// only concerned with filters created via helios
return Err(ExecutionError::FilterNotFound(filter_id).into());

Check failure on line 373 in core/src/execution/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement
}
}
self.verify_logs(&logs).await?;
Ok(logs)
}

pub async fn uninstall_filter(&self, filter_id: U256) -> Result<bool> {
Expand All @@ -385,7 +399,9 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
let filter_id = self.rpc.new_filter(&filter).await?;

// record the filter in the state
self.state.push_filter(filter_id, FilterType::Logs).await;
self.state
.push_filter(filter_id, FilterType::Logs(filter))
.await;

Ok(filter_id)
}
Expand Down Expand Up @@ -413,6 +429,50 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
Ok(filter_id)
}

/// Ensure that each log entry in the given array of logs match the given filter.
async fn ensure_logs_match_filter(&self, logs: &[Log], filter: &Filter) -> Result<()> {
fn log_matches_filter(log: &Log, filter: &Filter) -> bool {
if let Some(block_hash) = filter.get_block_hash() {
if log.block_hash.unwrap() != block_hash {
return false;
}
}
if let Some(from_block) = filter.get_from_block() {
if log.block_number.unwrap() < from_block {
return false;
}
}
if let Some(to_block) = filter.get_to_block() {
if log.block_number.unwrap() > to_block {
return false;
}
}
if !filter.address.matches(&log.address()) {
return false;
}
for (i, topic) in filter.topics.iter().enumerate() {
if let Some(log_topic) = log.topics().get(i) {
if !topic.matches(log_topic) {
return false;
}
} else {
// if filter topic is not present in log, it's a mismatch
return false;
}
}
true
}
for log in logs {
if !log_matches_filter(log, filter) {
return Err(ExecutionError::LogFilterMismatch().into());
}
}
Ok(())
}

/// Verify the integrity of each log entry in the given array of logs by
/// checking its inclusion in the corresponding transaction receipt
/// and verifying the transaction receipt itself against the block's receipt root.
async fn verify_logs(&self, logs: &[Log]) -> Result<()> {
// Collect all (unique) block numbers
let block_nums = logs
Expand Down
5 changes: 3 additions & 2 deletions core/src/execution/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use alloy::{
consensus::BlockHeader,
network::{primitives::HeaderResponse, BlockResponse},
primitives::{Address, B256, U256},
rpc::types::BlockTransactions,
rpc::types::{BlockTransactions, Filter},
};
use eyre::{eyre, Result};
use tokio::{
Expand Down Expand Up @@ -373,7 +373,8 @@ struct TransactionLocation {

#[derive(Clone)]
pub enum FilterType {

Check failure on line 375 in core/src/execution/state.rs

View workflow job for this annotation

GitHub Actions / clippy

large size difference between variants
Logs,
// filter content
Logs(Filter),
// block number when the filter was created or last queried
NewBlock(u64),
PendingTransactions,
Expand Down

0 comments on commit 32f47de

Please sign in to comment.