Skip to content

Commit

Permalink
Fixes needed to recover Holesky testnet
Browse files Browse the repository at this point in the history
  • Loading branch information
povi committed Feb 26, 2025
1 parent 7e161fc commit e7caccb
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub enum P2pMessage<P: Preset> {
Accept(GossipId),
Ignore(GossipId),
PublishBlobSidecar(Arc<BlobSidecar<P>>),
PenalizePeer(PeerId, MutatorRejectionReason),
Reject(Option<GossipId>, MutatorRejectionReason),
BlockNeeded(H256, Option<PeerId>),
FinalizedCheckpoint(Checkpoint),
Expand Down
22 changes: 17 additions & 5 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,24 @@ where
Err(error) => {
warn!("block rejected (error: {error}, origin: {origin:?})");

let (gossip_id, sender) = origin.split();
let sender = match origin {
BlockOrigin::Gossip(gossip_id) => {
P2pMessage::Reject(Some(gossip_id), MutatorRejectionReason::InvalidBlock)
.send(&self.p2p_tx);

if gossip_id.is_some() {
P2pMessage::Reject(gossip_id, MutatorRejectionReason::InvalidBlock)
.send(&self.p2p_tx);
}
None
}
BlockOrigin::Api(sender) => sender,
BlockOrigin::Requested(peer_id) => {
if let Some(peer_id) = peer_id {
P2pMessage::PenalizePeer(peer_id, MutatorRejectionReason::InvalidBlock)
.send(&self.p2p_tx);
}

None
}
BlockOrigin::Own | BlockOrigin::Persisted => None,
};

if let Some(block_root) = rejected_block_root {
self.store_mut().register_rejected_block(block_root);
Expand Down
96 changes: 92 additions & 4 deletions fork_choice_control/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@
// - Represent priorities with numbers and add some randomness to them.
// - Store task submission times and give very old tasks a higher priority.

use std::{collections::VecDeque, sync::Arc, thread::Builder};
use std::{collections::VecDeque, sync::Arc, thread::Builder, time::Instant};

use anyhow::Result;
use derivative::Derivative;
use derive_more::From;
use execution_engine::ExecutionEngine;
use log::debug;
use parking_lot::{Condvar, Mutex};
use ssz::SszHash as _;
use std_ext::ArcExt as _;
use types::preset::Preset;
use types::{preset::Preset, traits::SignedBeaconBlock as _};

use crate::{
tasks::{
Expand Down Expand Up @@ -122,6 +123,39 @@ impl<P: Preset, E: ExecutionEngine<P> + Send, W> Run for HighPriorityTask<P, E,
}
}

impl<P: Preset, E, W> HighPriorityTask<P, E, W> {
pub fn task_name(&self) -> String {
match self {
Self::Block(task) => format!(
"BlockTask (block root: {:?}, slot: {})",
task.block.hash_tree_root(),
task.block.message().slot(),
),
Self::BlockForGossip(task) => format!(
"BlockVerifyForGossipTask (block root: {:?}, slot: {})",
task.block.hash_tree_root(),
task.block.message().slot(),
),
Self::BlobSidecar(task) => format!(
"BlobSidecarTask (block root: {:?}, index: {}, slot: {})",
task.blob_sidecar
.signed_block_header
.message
.hash_tree_root(),
task.blob_sidecar.index,
task.blob_sidecar.signed_block_header.message.slot,
),
Self::CheckpointState(task) => {
format!("CheckpointStateTask (checkpoint: {:?})", task.checkpoint)
}
Self::PreprocessState(task) => format!(
"PreprocessStateTask (head block root: {:?}, next slot: {})",
task.head_block_root, task.next_slot
),
}
}
}

#[derive(From)]
enum LowPriorityTask<P: Preset, W> {
AggregateAndProof(AggregateAndProofTask<P, W>),
Expand All @@ -143,6 +177,30 @@ impl<P: Preset, W> Run for LowPriorityTask<P, W> {
}
}

impl<P: Preset, W> LowPriorityTask<P, W> {
pub fn task_name(&self) -> String {
match self {
Self::AggregateAndProof(task) => format!(
"AggregateAndProofTask (aggregate and proof: {:?}, origin: {:?})",
task.aggregate_and_proof, task.origin,
),
Self::Attestation(task) => {
format!("AttestationTask (attestation item: {:?})", task.attestation,)
}
Self::BlockAttestations(task) => format!(
"BlockAttestationsTask (block root: {:?}, slot: {})",
task.block.hash_tree_root(),
task.block.message().slot(),
),
Self::AttesterSlashing(task) => format!(
"AttesterSlashingTask (attester slashing: {:?})",
task.attester_slashing
),
Self::PersistBlobSidecarsTask(_task) => "PersistBlobSidecarsTask".into(),
}
}
}

pub trait Spawn<P: Preset, E, W> {
fn spawn(self, critical: &mut Critical<P, E, W>);
}
Expand Down Expand Up @@ -220,15 +278,45 @@ fn run_worker<P: Preset, E: ExecutionEngine<P> + Send, W>(shared: &Shared<P, E,

if let Some(task) = critical.high_priority_tasks.pop_front() {
drop(critical);
debug!("thread {} received high priority task", thread_name());

let started_at = Instant::now();

debug!(
"thread {} starting high priority task {}",
thread_name(),
task.task_name(),
);

task.run_and_handle_panics();

debug!(
"thread {} finished high priority task in {} ms",
thread_name(),
started_at.elapsed().as_millis()
);

continue 'outer;
}

if let Some(task) = critical.low_priority_tasks.pop_front() {
drop(critical);
debug!("thread {} received low priority task", thread_name());

let started_at = Instant::now();

debug!(
"thread {} starting low priority task {}",
thread_name(),
task.task_name()
);

task.run_and_handle_panics();

debug!(
"thread {} finished low priority task in {} ms",
thread_name(),
started_at.elapsed().as_millis()
);

continue 'outer;
}

Expand Down
1 change: 1 addition & 0 deletions fork_choice_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ features = { workspace = true }
futures = { workspace = true }
hash_hasher = { workspace = true }
helper_functions = { workspace = true }
hex-literal = { workspace = true }
im = { workspace = true }
itertools = { workspace = true }
kzg_utils = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion fork_choice_store/src/state_cache_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use types::{

use crate::Store;

const ALLOWED_EMPTY_SLOTS_MULTIPLIER: u64 = 2;
const ALLOWED_EMPTY_SLOTS_MULTIPLIER: u64 = 1_000_000;

pub struct StateCacheProcessor<P: Preset> {
state_cache: StateCache<P>,
Expand Down
57 changes: 35 additions & 22 deletions fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ pub struct Store<P: Preset> {
rejected_block_roots: HashSet<H256>,
finished_initial_forward_sync: bool,
finished_back_sync: bool,
blacklisted_blocks: HashSet<H256>,
}

impl<P: Preset> Store<P> {
Expand Down Expand Up @@ -254,6 +255,12 @@ impl<P: Preset> Store<P> {
let validator_count = anchor_state.validators().len_usize();
let latest_messages = core::iter::repeat_n(None, validator_count).collect();

let mut blacklisted_blocks = HashSet::new();

blacklisted_blocks.insert(H256(hex_literal::hex!(
"2db899881ed8546476d0b92c6aa9110bea9a4cd0dbeb5519eb0ea69575f1f359"
)));

Self {
chain_config,
store_config,
Expand Down Expand Up @@ -284,6 +291,7 @@ impl<P: Preset> Store<P> {
rejected_block_roots: HashSet::default(),
finished_initial_forward_sync,
finished_back_sync,
blacklisted_blocks,
}
}

Expand Down Expand Up @@ -1080,6 +1088,11 @@ impl<P: Preset> Store<P> {
) -> Result<(Arc<BeaconState<P>>, Option<BlockAction<P>>)>,
) -> Result<BlockAction<P>> {
let block_root = block.message().hash_tree_root();

if self.blacklisted_blocks.contains(&block_root) {
bail!("blacklisted block: {block_root:?}");
}

let block_action = self.validate_gossip_rules(block, block_root);

if let Some(action) = block_action {
Expand Down Expand Up @@ -1769,28 +1782,6 @@ impl<P: Preset> Store<P> {
return Ok(BlobSidecarAction::Ignore(true));
}

let state = match state_fn() {
Ok(state) => state,
Err(error) => {
if let Some(StateCacheError::StateFarBehind { .. }) = error.downcast_ref() {
return Ok(BlobSidecarAction::DelayUntilSlot(blob_sidecar));
}

bail!(error);
}
};

// [REJECT] The proposer signature of blob_sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey.
SingleVerifier.verify_singular(
blob_sidecar
.signed_block_header
.message
.signing_root(&self.chain_config, &state),
blob_sidecar.signed_block_header.signature,
accessors::public_key(&state, block_header.proposer_index)?,
SignatureKind::BlockInBlobSidecar,
)?;

// [REJECT] The sidecar's block's parent (defined by block_header.parent_root) passes validation.
// Part 1/2:
// Since our fork choice store's implementation doesn't preserve invalid blocks,
Expand Down Expand Up @@ -1860,6 +1851,28 @@ impl<P: Preset> Store<P> {
Error::BlobSidecarInvalid { blob_sidecar }
);

let state = match state_fn() {
Ok(state) => state,
Err(error) => {
if let Some(StateCacheError::StateFarBehind { .. }) = error.downcast_ref() {
return Ok(BlobSidecarAction::DelayUntilSlot(blob_sidecar));
}

bail!(error);
}
};

// [REJECT] The proposer signature of blob_sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey.
SingleVerifier.verify_singular(
blob_sidecar
.signed_block_header
.message
.signing_root(&self.chain_config, &state),
blob_sidecar.signed_block_header.signature,
accessors::public_key(&state, block_header.proposer_index)?,
SignatureKind::BlockInBlobSidecar,
)?;

if !origin.is_from_back_sync() {
// [REJECT] The sidecar is proposed by the expected proposer_index for the block's slot in the context of the current shuffling
// (defined by block_header.parent_root/block_header.slot).
Expand Down
2 changes: 1 addition & 1 deletion fork_choice_store/src/store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub const DEFAULT_CACHE_LOCK_TIMEOUT_MILLIS: u64 = 1500;
pub struct StoreConfig {
#[derivative(Default(value = "32"))]
pub max_empty_slots: u64,
#[derivative(Default(value = "64"))]
#[derivative(Default(value = "8"))]
pub max_epochs_to_retain_states_in_cache: u64,
#[derivative(Default(value = "Duration::from_millis(DEFAULT_CACHE_LOCK_TIMEOUT_MILLIS)"))]
pub state_cache_lock_timeout: Duration,
Expand Down
8 changes: 8 additions & 0 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ impl<P: Preset> Network<P> {
P2pMessage::PublishBlobSidecar(blob_sidecar) => {
self.publish_blob_sidecar(blob_sidecar);
},
P2pMessage::PenalizePeer(peer_id, mutator_rejection_reason) => {
self.report_peer(
peer_id,
PeerAction::LowToleranceError,
ReportSource::Processor,
mutator_rejection_reason,
);
}
P2pMessage::Reject(gossip_id, mutator_rejection_reason) => {
if let MutatorRejectionReason::InvalidBlobSidecar {
blob_identifier,
Expand Down
11 changes: 5 additions & 6 deletions p2p/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ impl SyncManager {
}

pub fn random_peer(&self, use_black_list: bool) -> Option<PeerId> {
let chain_id = self.chain_with_max_peer_count(use_black_list)?;
let chain_id = self.chain_to_sync(use_black_list)?;

let busy_peers = self
.blob_requests
Expand Down Expand Up @@ -653,7 +653,7 @@ impl SyncManager {
}

fn find_chain_to_sync(&mut self, use_black_list: bool) -> Option<ChainId> {
match self.chain_with_max_peer_count(use_black_list) {
match self.chain_to_sync(use_black_list) {
Some(chain_id) => {
self.log(
Level::Debug,
Expand Down Expand Up @@ -693,11 +693,10 @@ impl SyncManager {
peers
}

fn chain_with_max_peer_count(&self, use_black_list: bool) -> Option<ChainId> {
fn chain_to_sync(&self, use_black_list: bool) -> Option<ChainId> {
self.chains_with_peer_counts(use_black_list)
.into_iter()
.max_by_key(|(_, peer_count)| *peer_count)
.map(|(chain_id, _)| chain_id)
.into_keys()
.choose(&mut thread_rng())
}

fn peers(&self, use_black_list: bool) -> impl Iterator<Item = (&PeerId, &StatusMessage)> {
Expand Down

0 comments on commit e7caccb

Please sign in to comment.