Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify ChainSync actions under one enum #2180

Merged
merged 10 commits into from
Nov 13, 2023
159 changes: 77 additions & 82 deletions substrate/client/network/sync/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,30 +184,13 @@ struct GapSync<B: BlockT> {
target: NumberFor<B>,
}

/// Action that the parent of [`ChainSync`] should perform after reporting imported blocks with
/// [`ChainSync::on_blocks_processed`].
pub enum BlockRequestAction<B: BlockT> {
/// Send block request to peer. Always implies dropping a stale block request to the same peer.
SendRequest { peer_id: PeerId, request: BlockRequest<B> },
/// Drop stale block request.
RemoveStale { peer_id: PeerId },
}

/// Action that the parent of [`ChainSync`] should perform if we want to import blocks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ImportBlocksAction<B: BlockT> {
struct ImportBlocksAction<B: BlockT> {
pub origin: BlockOrigin,
pub blocks: Vec<IncomingBlock<B>>,
}

/// Action that the parent of [`ChainSync`] should perform if we want to import justifications.
pub struct ImportJustificationsAction<B: BlockT> {
pub peer_id: PeerId,
pub hash: B::Hash,
pub number: NumberFor<B>,
pub justifications: Justifications,
}

/// Result of [`ChainSync::on_block_data`].
#[derive(Debug, Clone, PartialEq, Eq)]
enum OnBlockData<Block: BlockT> {
Expand Down Expand Up @@ -242,30 +225,24 @@ enum OnStateData<Block: BlockT> {
Continue,
}

/// Action that the parent of [`ChainSync`] should perform after reporting block response with
/// [`ChainSync::on_block_response`].
pub enum OnBlockResponse<B: BlockT> {
/// Nothing to do.
Nothing,
/// Perform block request.
/// Action that the parent of [`ChainSync`] should perform after reporting a network or block event.
#[derive(Debug)]
pub enum ChainSyncAction<B: BlockT> {
/// Send block request to peer. Always implies dropping a stale block request to the same peer.
SendBlockRequest { peer_id: PeerId, request: BlockRequest<B> },
/// Drop stale block request.
CancelBlockRequest { peer_id: PeerId },
/// Peer misbehaved. Disconnect, report it and cancel the block request to it.
DropPeer(BadPeer),
/// Import blocks.
ImportBlocks(ImportBlocksAction<B>),
ImportBlocks { origin: BlockOrigin, blocks: Vec<IncomingBlock<B>> },
/// Import justifications.
ImportJustifications(ImportJustificationsAction<B>),
/// Invalid block response, the peer should be disconnected and reported.
DisconnectPeer(BadPeer),
}

/// Action that the parent of [`ChainSync`] should perform after reporting state response with
/// [`ChainSync::on_state_response`].
pub enum OnStateResponse<B: BlockT> {
/// Nothing to do.
Nothing,
/// Import blocks.
ImportBlocks(ImportBlocksAction<B>),
/// Invalid state response, the peer should be disconnected and reported.
DisconnectPeer(BadPeer),
ImportJustifications {
peer_id: PeerId,
hash: B::Hash,
number: NumberFor<B>,
justifications: Justifications,
},
}

/// The main data structure which contains all the state for a chains
Expand Down Expand Up @@ -313,6 +290,8 @@ pub struct ChainSync<B: BlockT, Client> {
import_existing: bool,
/// Gap download process.
gap_sync: Option<GapSync<B>>,
/// Pending actions.
actions: Vec<ChainSyncAction<B>>,
}

/// All the data we have about a Peer that we are trying to sync with
Expand Down Expand Up @@ -427,6 +406,7 @@ where
gap_sync: None,
warp_sync_config,
warp_sync_target_block_header: None,
actions: Vec::new(),
};

sync.reset_sync_start_point()?;
Expand Down Expand Up @@ -509,8 +489,17 @@ where
}

/// Notify syncing state machine that a new sync peer has connected.
pub fn new_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
match self.new_peer_inner(peer_id, best_hash, best_number) {
Ok(Some(request)) =>
self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }),
Ok(None) => {},
Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)),
}
}

#[must_use]
pub fn new_peer(
fn new_peer_inner(
&mut self,
peer_id: PeerId,
best_hash: B::Hash,
Expand Down Expand Up @@ -1196,8 +1185,7 @@ where
}

/// Notify that a sync peer has disconnected.
#[must_use]
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Option<ImportBlocksAction<B>> {
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
self.blocks.clear_peer_download(peer_id);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_peer_download(peer_id)
Expand All @@ -1212,7 +1200,11 @@ where

let blocks = self.ready_blocks();

(!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false))
if !blocks.is_empty() {
let ImportBlocksAction { origin, blocks } =
self.validate_and_queue_blocks(blocks, false);
self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not happening inside of validate_and_queue_blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — now all actions are pushed at the place they are generated. Except BadPeer errors — those are sometime deferred until the function returns to simplify the error handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually there is one function left returning actions — new_peer_inner. But we need to track in restart that it is not requesting any actions to cancel obsolete requests, so it's hard to make it not return actions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to fix tests 🙈

Copy link
Contributor Author

@dmitry-markin dmitry-markin Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now unit tests are not that unit anymore due to results ending up in self.actions...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are updated, but they became uglier because of the need to track actions pushed into self.actions instead of just checking return values.
@bkchr could you have a look again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to merge this to move forward, but we can revisit these changes later if you know better ways of testing structs with an output queue.

}
}

/// Get prometheus metrics.
Expand Down Expand Up @@ -1346,7 +1338,7 @@ where
/// Restart the sync process. This will reset all pending block requests and return an iterator
/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
/// their state was `DownloadingJustification`) are unaffected and will stay in the same state.
fn restart(&mut self) -> impl Iterator<Item = Result<BlockRequestAction<B>, BadPeer>> + '_ {
fn restart(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> + '_ {
self.blocks.clear();
if let Err(e) = self.reset_sync_start_point() {
warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
Expand Down Expand Up @@ -1378,13 +1370,13 @@ where
}

// handle peers that were in other states.
match self.new_peer(peer_id, p.best_hash, p.best_number) {
match self.new_peer_inner(peer_id, p.best_hash, p.best_number) {
// since the request is not a justification, remove it from pending responses
Ok(None) => Some(Ok(BlockRequestAction::RemoveStale { peer_id })),
Ok(None) => Some(ChainSyncAction::CancelBlockRequest { peer_id }),
// update the request if the new one is available
Ok(Some(request)) => Some(Ok(BlockRequestAction::SendRequest { peer_id, request })),
Ok(Some(request)) => Some(ChainSyncAction::SendBlockRequest { peer_id, request }),
// this implies that we need to drop pending response from the peer
Err(e) => Some(Err(e)),
Err(bad_peer) => Some(ChainSyncAction::DropPeer(bad_peer)),
}
})
}
Expand Down Expand Up @@ -1534,13 +1526,12 @@ where
}

/// Submit blocks received in a response.
#[must_use]
pub fn on_block_response(
&mut self,
peer_id: PeerId,
request: BlockRequest<B>,
blocks: Vec<BlockData<B>>,
) -> OnBlockResponse<B> {
) {
let block_response = BlockResponse::<B> { id: request.id, blocks };

let blocks_range = || match (
Expand All @@ -1565,39 +1556,35 @@ where

if request.fields == BlockAttributes::JUSTIFICATION {
match self.on_block_justification(peer_id, block_response) {
Ok(OnBlockJustification::Nothing) => OnBlockResponse::Nothing,
Ok(OnBlockJustification::Nothing) => {},
Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) =>
OnBlockResponse::ImportJustifications(ImportJustificationsAction {
self.actions.push(ChainSyncAction::ImportJustifications {
peer_id,
hash,
number,
justifications,
}),
Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer),
Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)),
}
} else {
match self.on_block_data(&peer_id, Some(request), block_response) {
Ok(OnBlockData::Import(action)) => OnBlockResponse::ImportBlocks(action),
Ok(OnBlockData::Import(ImportBlocksAction { origin, blocks })) =>
self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks }),
Ok(OnBlockData::Request(peer_id, request)) =>
OnBlockResponse::SendBlockRequest { peer_id, request },
Ok(OnBlockData::Continue) => OnBlockResponse::Nothing,
Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer),
self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }),
Ok(OnBlockData::Continue) => {},
Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)),
}
}
}

/// Submit a state received in a response.
#[must_use]
pub fn on_state_response(
&mut self,
peer_id: PeerId,
response: OpaqueStateResponse,
) -> OnStateResponse<B> {
pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) {
match self.on_state_data(&peer_id, response) {
Ok(OnStateData::Import(origin, block)) =>
OnStateResponse::ImportBlocks(ImportBlocksAction { origin, blocks: vec![block] }),
Ok(OnStateData::Continue) => OnStateResponse::Nothing,
Err(bad_peer) => OnStateResponse::DisconnectPeer(bad_peer),
self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] }),
Ok(OnStateData::Continue) => {},
Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)),
}
}

Expand Down Expand Up @@ -1903,12 +1890,7 @@ where
}

/// Submit a warp proof response received.
#[must_use]
pub fn on_warp_sync_response(
&mut self,
peer_id: &PeerId,
response: EncodedProof,
) -> Result<(), BadPeer> {
pub fn on_warp_sync_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
if let Some(peer) = self.peers.get_mut(peer_id) {
if let PeerSyncState::DownloadingWarpProof = peer.state {
peer.state = PeerSyncState::Available;
Expand All @@ -1925,14 +1907,16 @@ where
sync.import_warp_proof(response)
} else {
debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {peer_id}");
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
self.actions
.push(ChainSyncAction::DropPeer(BadPeer(*peer_id, rep::NOT_REQUESTED)));
return
};

match import_result {
WarpProofImportResult::Success => Ok(()),
WarpProofImportResult::Success => {},
WarpProofImportResult::BadResponse => {
debug!(target: LOG_TARGET, "Bad proof data received from {peer_id}");
Err(BadPeer(*peer_id, rep::BAD_BLOCK))
self.actions.push(ChainSyncAction::DropPeer(BadPeer(*peer_id, rep::BAD_BLOCK)));
},
}
}
Expand All @@ -1942,13 +1926,12 @@ where
/// Call this when a batch of blocks have been processed by the import
/// queue, with or without errors. If an error is returned, the pending response
/// from the peer must be dropped.
#[must_use]
pub fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
) -> Box<dyn Iterator<Item = Result<BlockRequestAction<B>, BadPeer>>> {
) {
trace!(target: LOG_TARGET, "Imported {imported} of {count}");

let mut output = Vec::new();
Expand Down Expand Up @@ -1993,7 +1976,10 @@ where
if aux.bad_justification {
if let Some(ref peer) = peer_id {
warn!("💔 Sent block with bad justification to import");
output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION)));
output.push(ChainSyncAction::DropPeer(BadPeer(
*peer,
rep::BAD_JUSTIFICATION,
)));
}
}

Expand Down Expand Up @@ -2042,7 +2028,8 @@ where
target: LOG_TARGET,
"💔 Peer sent block with incomplete header to import",
);
output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
output
.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
output.extend(self.restart());
},
Err(BlockImportError::VerificationFailed(peer_id, e)) => {
Expand All @@ -2055,7 +2042,8 @@ where
);

if let Some(peer) = peer_id {
output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL)));
output
.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
}

output.extend(self.restart());
Expand All @@ -2066,7 +2054,7 @@ where
target: LOG_TARGET,
"💔 Block {hash:?} received from peer {peer} has been blacklisted",
);
output.push(Err(BadPeer(peer, rep::BAD_BLOCK)));
output.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
},
Err(BlockImportError::MissingState) => {
// This may happen if the chain we were requesting upon has been discarded
Expand All @@ -2085,7 +2073,14 @@ where
}

self.allowed_requests.set_all();
Box::new(output.into_iter())

self.actions.append(&mut output);
}

/// Get pending actions to perform.
#[must_use]
pub fn take_actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> {
std::mem::take(&mut self.actions).into_iter()
}
}

Expand Down
Loading