Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore unknown block root events for processing blocks #5682

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?.launch_stream(block_roots))
}

/// Returns true if the given block_root is known but has not yet been imported. Checks caches
/// only, and does not check fork-choice nor the database.
pub fn contains_block_not_imported(self: &Arc<Self>, block_root: &Hash256) -> bool {
self.reqresp_pre_import_cache
.read()
.contains_key(block_root)
|| self.early_attester_cache.contains_block(*block_root)
}

pub fn get_blobs_checking_early_attester_cache(
&self,
block_root: &Hash256,
Expand Down
13 changes: 9 additions & 4 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::TimeLatch;
use parking_lot::Mutex;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -415,7 +415,7 @@ impl Drop for DuplicateCacheHandle {
/// A simple cache for detecting duplicate block roots across multiple threads.
#[derive(Clone, Default)]
pub struct DuplicateCache {
inner: Arc<Mutex<HashSet<Hash256>>>,
inner: Arc<RwLock<HashSet<Hash256>>>,
}

impl DuplicateCache {
Expand All @@ -428,7 +428,7 @@ impl DuplicateCache {
/// The handle removes the entry from the cache when it is dropped. This ensures that any unclean
/// shutdowns in the worker tasks does not leave inconsistent state in the cache.
pub fn check_and_insert(&self, block_root: Hash256) -> Option<DuplicateCacheHandle> {
let mut inner = self.inner.lock();
let mut inner = self.inner.write();
if inner.insert(block_root) {
Some(DuplicateCacheHandle {
entry: block_root,
Expand All @@ -439,9 +439,14 @@ impl DuplicateCache {
}
}

/// Checks if the give block_root exists in the cache
pub fn check(&self, block_root: &Hash256) -> bool {
self.inner.read().contains(block_root)
}

/// Remove the given block_root from the cache.
pub fn remove(&self, block_root: &Hash256) {
let mut inner = self.inner.lock();
let mut inner = self.inner.write();
inner.remove(block_root);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
subnet_id: SubnetId,
should_import: bool,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
duplicate_cache: DuplicateCache,
seen_timestamp: Duration,
) {
let result = match self
Expand All @@ -220,6 +221,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_id,
subnet_id,
reprocess_tx,
duplicate_cache,
should_import,
seen_timestamp,
);
Expand All @@ -229,6 +231,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self: Arc<Self>,
packages: Vec<GossipAttestationPackage<T::EthSpec>>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
duplicate_cache: DuplicateCache,
) {
let attestations_and_subnets = packages
.iter()
Expand Down Expand Up @@ -287,6 +290,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
package.peer_id,
package.subnet_id,
reprocess_tx.clone(),
duplicate_cache.clone(),
package.should_import,
package.seen_timestamp,
);
Expand All @@ -303,6 +307,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_id: PeerId,
subnet_id: SubnetId,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
duplicate_cache: DuplicateCache,
should_import: bool,
seen_timestamp: Duration,
) {
Expand Down Expand Up @@ -390,6 +395,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
seen_timestamp,
},
reprocess_tx,
duplicate_cache,
error,
seen_timestamp,
);
Expand All @@ -410,6 +416,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_id: PeerId,
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
duplicate_cache: DuplicateCache,
seen_timestamp: Duration,
) {
let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root;
Expand All @@ -434,6 +441,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id,
peer_id,
reprocess_tx,
duplicate_cache,
seen_timestamp,
);
}
Expand All @@ -442,6 +450,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self: Arc<Self>,
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
duplicate_cache: DuplicateCache,
) {
let aggregates = packages.iter().map(|package| package.aggregate.as_ref());

Expand Down Expand Up @@ -498,18 +507,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
package.message_id,
package.peer_id,
reprocess_tx.clone(),
duplicate_cache.clone(),
package.seen_timestamp,
);
}
}

#[allow(clippy::too_many_arguments)]
fn process_gossip_aggregate_result(
self: &Arc<Self>,
result: Result<VerifiedAggregate<T>, RejectedAggregate<T::EthSpec>>,
beacon_block_root: Hash256,
message_id: MessageId,
peer_id: PeerId,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
duplicate_cache: DuplicateCache,
seen_timestamp: Duration,
) {
match result {
Expand Down Expand Up @@ -592,6 +604,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
seen_timestamp,
},
reprocess_tx,
duplicate_cache,
error,
seen_timestamp,
);
Expand Down Expand Up @@ -1822,12 +1835,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
/// network.
#[allow(clippy::too_many_arguments)]
fn handle_attestation_verification_failure(
self: &Arc<Self>,
peer_id: PeerId,
message_id: MessageId,
failed_att: FailedAtt<T::EthSpec>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
duplicate_cache: DuplicateCache,
error: AttnError,
seen_timestamp: Duration,
) {
Expand Down Expand Up @@ -2043,18 +2058,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if let Some(sender) = reprocess_tx {
// We don't know the block, get the sync manager to handle the block lookup, and
// send the attestation to be scheduled for re-processing.
self.sync_tx
.send(SyncMessage::UnknownBlockHashFromAttestation(
peer_id,
*beacon_block_root,
))
.unwrap_or_else(|_| {
warn!(
self.log,
"Failed to send to sync service";
"msg" => "UnknownBlockHash"
)
});
if !duplicate_cache.check(beacon_block_root) {
self.sync_tx
.send(SyncMessage::UnknownBlockHashFromAttestation(
peer_id,
*beacon_block_root,
))
.unwrap_or_else(|_| {
warn!(
self.log,
"Failed to send to sync service";
"msg" => "UnknownBlockHash"
)
});
}
let msg = match failed_att {
FailedAtt::Aggregate {
attestation,
Expand All @@ -2072,6 +2089,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_id,
attestation,
None, // Do not allow this attestation to be re-processed beyond this point.
duplicate_cache,
seen_timestamp,
)
}),
Expand All @@ -2097,6 +2115,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
subnet_id,
should_import,
None, // Do not allow this attestation to be re-processed beyond this point.
duplicate_cache,
seen_timestamp,
)
}),
Expand Down
18 changes: 16 additions & 2 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let processor = self.clone();
let process_individual = move |package: GossipAttestationPackage<T::EthSpec>| {
let reprocess_tx = processor.reprocess_tx.clone();
let duplicate_cache = processor.duplicate_cache.clone();
processor.process_gossip_attestation(
package.message_id,
package.peer_id,
package.attestation,
package.subnet_id,
package.should_import,
Some(reprocess_tx),
duplicate_cache,
package.seen_timestamp,
)
};
Expand All @@ -96,7 +98,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let processor = self.clone();
let process_batch = move |attestations| {
let reprocess_tx = processor.reprocess_tx.clone();
processor.process_gossip_attestation_batch(attestations, Some(reprocess_tx))
let duplicate_cache = processor.duplicate_cache.clone();
processor.process_gossip_attestation_batch(
attestations,
Some(reprocess_tx),
duplicate_cache,
)
};

self.try_send(BeaconWorkEvent {
Expand Down Expand Up @@ -128,11 +135,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let processor = self.clone();
let process_individual = move |package: GossipAggregatePackage<T::EthSpec>| {
let reprocess_tx = processor.reprocess_tx.clone();
let duplicate_cache = processor.duplicate_cache.clone();
processor.process_gossip_aggregate(
package.message_id,
package.peer_id,
package.aggregate,
Some(reprocess_tx),
duplicate_cache,
package.seen_timestamp,
)
};
Expand All @@ -141,7 +150,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let processor = self.clone();
let process_batch = move |aggregates| {
let reprocess_tx = processor.reprocess_tx.clone();
processor.process_gossip_aggregate_batch(aggregates, Some(reprocess_tx))
let duplicate_cache = processor.duplicate_cache.clone();
processor.process_gossip_aggregate_batch(
aggregates,
Some(reprocess_tx),
duplicate_cache,
)
};

let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root;
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
slot: Slot,
block_component: BlockComponent<T::EthSpec>,
) {
match self.should_search_for_block(Some(slot), &peer_id) {
match self.should_search_for_block(Some(slot), block_root, &peer_id) {
Ok(_) => {
self.block_lookups.search_child_and_parent(
block_root,
Expand All @@ -685,7 +685,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}

fn handle_unknown_block_root(&mut self, peer_id: PeerId, block_root: Hash256) {
match self.should_search_for_block(None, &peer_id) {
match self.should_search_for_block(None, block_root, &peer_id) {
Ok(_) => {
self.block_lookups
.search_unknown_block(block_root, &[peer_id], &mut self.network);
Expand All @@ -699,6 +699,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn should_search_for_block(
&mut self,
block_slot: Option<Slot>,
block_root: Hash256,
peer_id: &PeerId,
) -> Result<(), &'static str> {
if !self.network_globals().sync_state.read().is_synced() {
Expand All @@ -725,6 +726,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if !self.network.is_execution_engine_online() {
return Err("execution engine offline");
}
if self.chain.contains_block_not_imported(&block_root) {
return Err("already known");
}
Ok(())
}

Expand Down
Loading