From 422fca4bd6c756522250f2a5c0cd73b093318ba8 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 6 Nov 2023 18:25:22 +0200 Subject: [PATCH 1/8] Unify `ChainSync` actions under one enum --- .../client/network/sync/src/chain_sync.rs | 156 +++++++++--------- .../network/sync/src/chain_sync/test.rs | 64 ++++--- substrate/client/network/sync/src/engine.rs | 132 ++++++--------- 3 files changed, 150 insertions(+), 202 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 858125f93f1fd..3da0cb5a005de 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -184,15 +184,6 @@ struct GapSync { target: NumberFor, } -/// Action that the parent of [`ChainSync`] should perform after reporting imported blocks with -/// [`ChainSync::on_blocks_processed`]. -pub enum BlockRequestAction { - /// Send block request to peer. Always implies dropping a stale block request to the same peer. - SendRequest { peer_id: PeerId, request: BlockRequest }, - /// Drop stale block request. - RemoveStale { peer_id: PeerId }, -} - /// Action that the parent of [`ChainSync`] should perform if we want to import blocks. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ImportBlocksAction { @@ -200,14 +191,6 @@ pub struct ImportBlocksAction { pub blocks: Vec>, } -/// Action that the parent of [`ChainSync`] should perform if we want to import justifications. -pub struct ImportJustificationsAction { - pub peer_id: PeerId, - pub hash: B::Hash, - pub number: NumberFor, - pub justifications: Justifications, -} - /// Result of [`ChainSync::on_block_data`]. #[derive(Debug, Clone, PartialEq, Eq)] enum OnBlockData { @@ -242,30 +225,23 @@ enum OnStateData { Continue, } -/// Action that the parent of [`ChainSync`] should perform after reporting block response with -/// [`ChainSync::on_block_response`]. -pub enum OnBlockResponse { - /// Nothing to do. - Nothing, - /// Perform block request. +/// Action that the parent of [`ChainSync`] should perform after reporting a network or block event. +pub enum ChainSyncAction { + /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, + /// Drop stale block request. + CancelBlockRequest { peer_id: PeerId }, + /// Peer misbehaved. Disconnect, report it and cancel the block request to it. + DropPeer(BadPeer), /// Import blocks. - ImportBlocks(ImportBlocksAction), + ImportBlocks { origin: BlockOrigin, blocks: Vec> }, /// Import justifications. - ImportJustifications(ImportJustificationsAction), - /// Invalid block response, the peer should be disconnected and reported. - DisconnectPeer(BadPeer), -} - -/// Action that the parent of [`ChainSync`] should perform after reporting state response with -/// [`ChainSync::on_state_response`]. -pub enum OnStateResponse { - /// Nothing to do. - Nothing, - /// Import blocks. - ImportBlocks(ImportBlocksAction), - /// Invalid state response, the peer should be disconnected and reported. - DisconnectPeer(BadPeer), + ImportJustifications { + peer_id: PeerId, + hash: B::Hash, + number: NumberFor, + justifications: Justifications, + }, } /// The main data structure which contains all the state for a chains @@ -313,6 +289,8 @@ pub struct ChainSync { import_existing: bool, /// Gap download process. gap_sync: Option>, + /// Pending actions. + actions: Vec>, } /// All the data we have about a Peer that we are trying to sync with @@ -427,6 +405,7 @@ where gap_sync: None, warp_sync_config, warp_sync_target_block_header: None, + actions: Vec::new(), }; sync.reset_sync_start_point()?; @@ -509,8 +488,17 @@ where } /// Notify syncing state machine that a new sync peer has connected. + pub fn new_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + match self.new_peer_inner(peer_id, best_hash, best_number) { + Ok(Some(request)) => + self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), + Ok(None) => {}, + Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), + } + } + #[must_use] - pub fn new_peer( + fn new_peer_inner( &mut self, peer_id: PeerId, best_hash: B::Hash, @@ -1196,8 +1184,7 @@ where } /// Notify that a sync peer has disconnected. - #[must_use] - pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Option> { + pub fn peer_disconnected(&mut self, peer_id: &PeerId) { self.blocks.clear_peer_download(peer_id); if let Some(gap_sync) = &mut self.gap_sync { gap_sync.blocks.clear_peer_download(peer_id) @@ -1212,7 +1199,11 @@ where let blocks = self.ready_blocks(); - (!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false)) + if !blocks.is_empty() { + let ImportBlocksAction { origin, blocks } = + self.validate_and_queue_blocks(blocks, false); + self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks }) + } } /// Get prometheus metrics. @@ -1346,7 +1337,7 @@ where /// Restart the sync process. This will reset all pending block requests and return an iterator /// of new block requests to make to peers. Peers that were downloading finality data (i.e. /// their state was `DownloadingJustification`) are unaffected and will stay in the same state. - fn restart(&mut self) -> impl Iterator, BadPeer>> + '_ { + fn restart(&mut self) -> impl Iterator> + '_ { self.blocks.clear(); if let Err(e) = self.reset_sync_start_point() { warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}"); @@ -1378,13 +1369,13 @@ where } // handle peers that were in other states. - match self.new_peer(peer_id, p.best_hash, p.best_number) { + match self.new_peer_inner(peer_id, p.best_hash, p.best_number) { // since the request is not a justification, remove it from pending responses - Ok(None) => Some(Ok(BlockRequestAction::RemoveStale { peer_id })), + Ok(None) => Some(ChainSyncAction::CancelBlockRequest { peer_id }), // update the request if the new one is available - Ok(Some(request)) => Some(Ok(BlockRequestAction::SendRequest { peer_id, request })), + Ok(Some(request)) => Some(ChainSyncAction::SendBlockRequest { peer_id, request }), // this implies that we need to drop pending response from the peer - Err(e) => Some(Err(e)), + Err(bad_peer) => Some(ChainSyncAction::DropPeer(bad_peer)), } }) } @@ -1534,13 +1525,12 @@ where } /// Submit blocks received in a response. - #[must_use] pub fn on_block_response( &mut self, peer_id: PeerId, request: BlockRequest, blocks: Vec>, - ) -> OnBlockResponse { + ) { let block_response = BlockResponse:: { id: request.id, blocks }; let blocks_range = || match ( @@ -1565,39 +1555,35 @@ where if request.fields == BlockAttributes::JUSTIFICATION { match self.on_block_justification(peer_id, block_response) { - Ok(OnBlockJustification::Nothing) => OnBlockResponse::Nothing, + Ok(OnBlockJustification::Nothing) => {}, Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) => - OnBlockResponse::ImportJustifications(ImportJustificationsAction { + self.actions.push(ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications, }), - Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer), + Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), } } else { match self.on_block_data(&peer_id, Some(request), block_response) { - Ok(OnBlockData::Import(action)) => OnBlockResponse::ImportBlocks(action), + Ok(OnBlockData::Import(ImportBlocksAction { origin, blocks })) => + self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks }), Ok(OnBlockData::Request(peer_id, request)) => - OnBlockResponse::SendBlockRequest { peer_id, request }, - Ok(OnBlockData::Continue) => OnBlockResponse::Nothing, - Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer), + self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), + Ok(OnBlockData::Continue) => {}, + Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), } } } /// Submit a state received in a response. - #[must_use] - pub fn on_state_response( - &mut self, - peer_id: PeerId, - response: OpaqueStateResponse, - ) -> OnStateResponse { + pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { match self.on_state_data(&peer_id, response) { Ok(OnStateData::Import(origin, block)) => - OnStateResponse::ImportBlocks(ImportBlocksAction { origin, blocks: vec![block] }), - Ok(OnStateData::Continue) => OnStateResponse::Nothing, - Err(bad_peer) => OnStateResponse::DisconnectPeer(bad_peer), + self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] }), + Ok(OnStateData::Continue) => {}, + Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), } } @@ -1903,12 +1889,7 @@ where } /// Submit a warp proof response received. - #[must_use] - pub fn on_warp_sync_response( - &mut self, - peer_id: &PeerId, - response: EncodedProof, - ) -> Result<(), BadPeer> { + pub fn on_warp_sync_response(&mut self, peer_id: &PeerId, response: EncodedProof) { if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::DownloadingWarpProof = peer.state { peer.state = PeerSyncState::Available; @@ -1925,14 +1906,16 @@ where sync.import_warp_proof(response) } else { debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + self.actions + .push(ChainSyncAction::DropPeer(BadPeer(*peer_id, rep::NOT_REQUESTED))); + return }; match import_result { - WarpProofImportResult::Success => Ok(()), + WarpProofImportResult::Success => {}, WarpProofImportResult::BadResponse => { debug!(target: LOG_TARGET, "Bad proof data received from {peer_id}"); - Err(BadPeer(*peer_id, rep::BAD_BLOCK)) + self.actions.push(ChainSyncAction::DropPeer(BadPeer(*peer_id, rep::BAD_BLOCK))); }, } } @@ -1942,13 +1925,12 @@ where /// Call this when a batch of blocks have been processed by the import /// queue, with or without errors. If an error is returned, the pending response /// from the peer must be dropped. - #[must_use] pub fn on_blocks_processed( &mut self, imported: usize, count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, - ) -> Box, BadPeer>>> { + ) { trace!(target: LOG_TARGET, "Imported {imported} of {count}"); let mut output = Vec::new(); @@ -1993,7 +1975,10 @@ where if aux.bad_justification { if let Some(ref peer) = peer_id { warn!("💔 Sent block with bad justification to import"); - output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION))); + output.push(ChainSyncAction::DropPeer(BadPeer( + *peer, + rep::BAD_JUSTIFICATION, + ))); } } @@ -2042,7 +2027,8 @@ where target: LOG_TARGET, "💔 Peer sent block with incomplete header to import", ); - output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER))); + output + .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); output.extend(self.restart()); }, Err(BlockImportError::VerificationFailed(peer_id, e)) => { @@ -2055,7 +2041,8 @@ where ); if let Some(peer) = peer_id { - output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL))); + output + .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); } output.extend(self.restart()); @@ -2066,7 +2053,7 @@ where target: LOG_TARGET, "💔 Block {hash:?} received from peer {peer} has been blacklisted", ); - output.push(Err(BadPeer(peer, rep::BAD_BLOCK))); + output.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); }, Err(BlockImportError::MissingState) => { // This may happen if the chain we were requesting upon has been discarded @@ -2085,7 +2072,14 @@ where } self.allowed_requests.set_all(); - Box::new(output.into_iter()) + + self.actions.append(&mut output); + } + + /// Get pending actions to perform. + #[must_use] + pub fn take_actions(&mut self) -> impl Iterator> { + std::mem::take(&mut self.actions).into_iter() } } diff --git a/substrate/client/network/sync/src/chain_sync/test.rs b/substrate/client/network/sync/src/chain_sync/test.rs index 2eefd2ad13ef8..a769ff9119ab9 100644 --- a/substrate/client/network/sync/src/chain_sync/test.rs +++ b/substrate/client/network/sync/src/chain_sync/test.rs @@ -53,7 +53,7 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { }; // add a new peer with the same best block - sync.new_peer(peer_id, a1_hash, a1_number).unwrap(); + sync.new_peer(peer_id, a1_hash, a1_number); // and request a justification for the block sync.request_justification(&a1_hash, a1_number); @@ -119,8 +119,8 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { let (b1_hash, b1_number) = new_blocks(50); // add 2 peers at blocks that we don't have locally - sync.new_peer(peer_id1, Hash::random(), 42).unwrap(); - sync.new_peer(peer_id2, Hash::random(), 10).unwrap(); + sync.new_peer(peer_id1, Hash::random(), 42); + sync.new_peer(peer_id2, Hash::random(), 10); // we wil send block requests to these peers // for these blocks we don't know about @@ -130,7 +130,7 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { .all(|(p, _)| { p == peer_id1 || p == peer_id2 })); // add a new peer at a known block - sync.new_peer(peer_id3, b1_hash, b1_number).unwrap(); + sync.new_peer(peer_id3, b1_hash, b1_number); // we request a justification for a block we have locally sync.request_justification(&b1_hash, b1_number); @@ -149,13 +149,13 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { ); // we restart the sync state - let block_requests = sync.restart(); + let block_requests = sync.restart().collect::>(); // which should make us send out block requests to the first two peers - assert!(block_requests.map(|r| r.unwrap()).all(|event| match event { - BlockRequestAction::SendRequest { peer_id, .. } => - peer_id == peer_id1 || peer_id == peer_id2, - BlockRequestAction::RemoveStale { .. } => false, + assert!(block_requests.iter().all(|action| match action { + ChainSyncAction::SendBlockRequest { peer_id, .. } => + peer_id == &peer_id1 || peer_id == &peer_id2, + _ => false, })); // peer 3 should be unaffected it was downloading finality data @@ -280,9 +280,8 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { let best_block = blocks.last().unwrap().clone(); let max_blocks_to_request = sync.max_blocks_per_request; // Connect the node we will sync from - sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()) - .unwrap(); - sync.new_peer(peer_id2, info.best_hash, 0).unwrap(); + sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()); + sync.new_peer(peer_id2, info.best_hash, 0); let mut best_block_num = 0; while best_block_num < MAX_DOWNLOAD_AHEAD { @@ -421,8 +420,7 @@ fn can_sync_huge_fork() { let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone(); // Connect the node we will sync from - sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()); send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); @@ -539,8 +537,7 @@ fn syncs_fork_without_duplicate_requests() { let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone(); // Connect the node we will sync from - sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()); send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); @@ -653,8 +650,7 @@ fn removes_target_fork_on_disconnect() { let peer_id1 = PeerId::random(); let common_block = blocks[1].clone(); // Connect the node we will sync from - sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()); // Create a "new" header and announce it let mut header = blocks[0].header().clone(); @@ -678,8 +674,7 @@ fn can_import_response_with_missing_blocks() { let peer_id1 = PeerId::random(); let best_block = blocks[3].clone(); - sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()); sync.peers.get_mut(&peer_id1).unwrap().state = PeerSyncState::Available; sync.peers.get_mut(&peer_id1).unwrap().common_number = 0; @@ -730,7 +725,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { let (b1_hash, b1_number) = new_blocks(50); // add new peer and request blocks from them - sync.new_peer(peers[0], Hash::random(), 42).unwrap(); + sync.new_peer(peers[0], Hash::random(), 42); // we don't actually perform any requests, just keep track of peers waiting for a response let mut pending_responses = HashSet::new(); @@ -743,7 +738,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { } // add a new peer at a known block - sync.new_peer(peers[1], b1_hash, b1_number).unwrap(); + sync.new_peer(peers[1], b1_hash, b1_number); // we request a justification for a block we have locally sync.request_justification(&b1_hash, b1_number); @@ -767,23 +762,24 @@ fn sync_restart_removes_block_but_not_justification_requests() { assert_eq!(pending_responses.len(), 2); // restart sync - let request_events = sync.restart().collect::>(); - for event in request_events.iter() { - match event.as_ref().unwrap() { - BlockRequestAction::RemoveStale { peer_id } => { + let actions = sync.restart().collect::>(); + for action in actions.iter() { + match action { + ChainSyncAction::CancelBlockRequest { peer_id } => { pending_responses.remove(&peer_id); }, - BlockRequestAction::SendRequest { peer_id, .. } => { + ChainSyncAction::SendBlockRequest { peer_id, .. } => { // we drop obsolete response, but don't register a new request, it's checked in // the `assert!` below pending_responses.remove(&peer_id); }, + _ => panic!("Unexpected action"), } } - assert!(request_events.iter().any(|event| { - match event.as_ref().unwrap() { - BlockRequestAction::RemoveStale { .. } => false, - BlockRequestAction::SendRequest { peer_id, .. } => peer_id == &peers[0], + assert!(actions.iter().any(|action| { + match action { + ChainSyncAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0], + _ => false, } })); @@ -848,11 +844,9 @@ fn request_across_forks() { // Add the peers, all at the common ancestor 100. let common_block = blocks.last().unwrap(); let peer_id1 = PeerId::random(); - sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()); let peer_id2 = PeerId::random(); - sync.new_peer(peer_id2, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id2, common_block.hash(), *common_block.header().number()); // Peer 1 announces 107 from fork 1, 100-107 get downloaded. { diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 560887132e3a8..626ade635cd15 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -25,10 +25,7 @@ use crate::{ }, block_relay_protocol::{BlockDownloader, BlockResponseError}, block_request_handler::MAX_BLOCKS_IN_RESPONSE, - chain_sync::{ - BlockRequestAction, ChainSync, ImportBlocksAction, ImportJustificationsAction, - OnBlockResponse, OnStateResponse, - }, + chain_sync::{ChainSync, ChainSyncAction}, pending_responses::{PendingResponses, ResponseEvent}, schema::v1::{StateRequest, StateResponse}, service::{ @@ -58,7 +55,7 @@ use schnellru::{ByLength, LruMap}; use tokio::time::{Interval, MissedTickBehavior}; use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; -use sc_consensus::import_queue::ImportQueueService; +use sc_consensus::{import_queue::ImportQueueService, IncomingBlock}; use sc_network::{ config::{ FullNetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, @@ -74,8 +71,11 @@ use sc_network_common::{ }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::{Error as ClientError, HeaderMetadata}; -use sp_consensus::block_validation::BlockAnnounceValidator; -use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; +use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin}; +use sp_runtime::{ + traits::{Block as BlockT, Header, NumberFor, Zero}, + Justifications, +}; use std::{ collections::{HashMap, HashSet}, @@ -713,11 +713,36 @@ where self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); + // Perform actions requested by `ChainSync` in during `select!`. + self.perform_chain_sync_actions(); + // Send outbound requests on `ChanSync`'s behalf. self.send_chain_sync_requests(); } } + fn perform_chain_sync_actions(&mut self) { + self.chain_sync.take_actions().for_each(|action| match action { + ChainSyncAction::SendBlockRequest { peer_id, request } => { + // drop obsolete pending response first + self.pending_responses.remove(&peer_id); + self.send_block_request(peer_id, request); + }, + ChainSyncAction::CancelBlockRequest { peer_id } => { + self.pending_responses.remove(&peer_id); + }, + ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => { + self.pending_responses.remove(&peer_id); + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(peer_id, rep); + }, + ChainSyncAction::ImportBlocks { origin, blocks } => self.import_blocks(origin, blocks), + ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => + self.import_justifications(peer_id, hash, number, justifications), + }); + } + fn perform_periodic_actions(&mut self) { self.report_metrics(); @@ -766,28 +791,7 @@ where ToServiceCommand::ClearJustificationRequests => self.chain_sync.clear_justification_requests(), ToServiceCommand::BlocksProcessed(imported, count, results) => { - for result in self.chain_sync.on_blocks_processed(imported, count, results) { - match result { - Ok(action) => match action { - BlockRequestAction::SendRequest { peer_id, request } => { - // drop obsolete pending response first - self.pending_responses.remove(&peer_id); - self.send_block_request(peer_id, request); - }, - BlockRequestAction::RemoveStale { peer_id } => { - self.pending_responses.remove(&peer_id); - }, - }, - Err(BadPeer(peer_id, repu)) => { - self.pending_responses.remove(&peer_id); - self.network_service.disconnect_peer( - peer_id, - self.block_announce_protocol_name.clone(), - ); - self.network_service.report_peer(peer_id, repu) - }, - } - } + self.chain_sync.on_blocks_processed(imported, count, results); }, ToServiceCommand::JustificationImported(peer_id, hash, number, success) => { self.chain_sync.on_justification_import(hash, number, success); @@ -940,9 +944,7 @@ where } } - if let Some(import_blocks_action) = self.chain_sync.peer_disconnected(&peer_id) { - self.import_blocks(import_blocks_action) - } + self.chain_sync.peer_disconnected(&peer_id); self.pending_responses.remove(&peer_id); self.event_streams.retain(|stream| { @@ -1053,17 +1055,7 @@ where inbound, }; - let req = if peer.info.roles.is_full() { - match self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number) { - Ok(req) => req, - Err(BadPeer(id, repu)) => { - self.network_service.report_peer(id, repu); - return Err(()) - }, - } - } else { - None - }; + self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number); log::debug!(target: LOG_TARGET, "Connected {peer_id}"); @@ -1075,10 +1067,6 @@ where self.num_in_peers += 1; } - if let Some(req) = req { - self.send_block_request(peer_id, req); - } - self.event_streams .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok()); @@ -1202,22 +1190,7 @@ where PeerRequest::Block(req) => { match self.block_downloader.block_response_into_blocks(&req, resp) { Ok(blocks) => { - match self.chain_sync.on_block_response(peer_id, req, blocks) { - OnBlockResponse::SendBlockRequest { peer_id, request } => - self.send_block_request(peer_id, request), - OnBlockResponse::ImportBlocks(import_blocks_action) => - self.import_blocks(import_blocks_action), - OnBlockResponse::ImportJustifications(action) => - self.import_justifications(action), - OnBlockResponse::Nothing => {}, - OnBlockResponse::DisconnectPeer(BadPeer(peer_id, rep)) => { - self.network_service.disconnect_peer( - peer_id, - self.block_announce_protocol_name.clone(), - ); - self.network_service.report_peer(peer_id, rep); - }, - } + self.chain_sync.on_block_response(peer_id, req, blocks); }, Err(BlockResponseError::DecodeFailed(e)) => { debug!( @@ -1262,27 +1235,10 @@ where }, }; - match self.chain_sync.on_state_response(peer_id, response) { - OnStateResponse::ImportBlocks(import_blocks_action) => - self.import_blocks(import_blocks_action), - OnStateResponse::DisconnectPeer(BadPeer(peer_id, rep)) => { - self.network_service.disconnect_peer( - peer_id, - self.block_announce_protocol_name.clone(), - ); - self.network_service.report_peer(peer_id, rep); - }, - OnStateResponse::Nothing => {}, - } + self.chain_sync.on_state_response(peer_id, response); }, PeerRequest::WarpProof => { - if let Err(BadPeer(peer_id, rep)) = - self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp)) - { - self.network_service - .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer(peer_id, rep); - } + self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp)); }, }, Ok(Err(e)) => { @@ -1388,7 +1344,7 @@ where } /// Import blocks. - fn import_blocks(&mut self, ImportBlocksAction { origin, blocks }: ImportBlocksAction) { + fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { if let Some(metrics) = &self.metrics { metrics.import_queue_blocks_submitted.inc(); } @@ -1397,13 +1353,17 @@ where } /// Import justifications. - fn import_justifications(&mut self, action: ImportJustificationsAction) { + fn import_justifications( + &mut self, + peer_id: PeerId, + hash: B::Hash, + number: NumberFor, + justifications: Justifications, + ) { if let Some(metrics) = &self.metrics { metrics.import_queue_justifications_submitted.inc(); } - let ImportJustificationsAction { peer_id, hash, number, justifications } = action; - self.import_queue.import_justifications(peer_id, hash, number, justifications); } } From 37a488a192f19381c176fb91a7c37f4b3b311ff6 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 7 Nov 2023 15:06:13 +0200 Subject: [PATCH 2/8] minor: make `ImportBlocksAction` private --- substrate/client/network/sync/src/chain_sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 3da0cb5a005de..d878d64a189b4 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -186,7 +186,7 @@ struct GapSync { /// Action that the parent of [`ChainSync`] should perform if we want to import blocks. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct ImportBlocksAction { +struct ImportBlocksAction { pub origin: BlockOrigin, pub blocks: Vec>, } From f7d689364fb379c2be29a2d0b3c586511e89d4ab Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 9 Nov 2023 12:06:17 +0200 Subject: [PATCH 3/8] Log actions in `SyncingEngine::process_chain_sync_actions` --- .../client/network/sync/src/chain_sync.rs | 1 + substrate/client/network/sync/src/engine.rs | 50 +++++++++++++++---- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index d878d64a189b4..f7de98afa4296 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -226,6 +226,7 @@ enum OnStateData { } /// Action that the parent of [`ChainSync`] should perform after reporting a network or block event. +#[derive(Debug)] pub enum ChainSyncAction { /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 626ade635cd15..99363ae472039 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -713,33 +713,63 @@ where self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); - // Perform actions requested by `ChainSync` in during `select!`. - self.perform_chain_sync_actions(); + // Process actions requested by `ChainSync` during `select!`. + self.process_chain_sync_actions(); // Send outbound requests on `ChanSync`'s behalf. self.send_chain_sync_requests(); } } - fn perform_chain_sync_actions(&mut self) { + fn process_chain_sync_actions(&mut self) { self.chain_sync.take_actions().for_each(|action| match action { ChainSyncAction::SendBlockRequest { peer_id, request } => { - // drop obsolete pending response first - self.pending_responses.remove(&peer_id); - self.send_block_request(peer_id, request); + // Sending block request implies dropping obsolete pending response as we are not + // interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]). + let removed = self.pending_responses.remove(&peer_id); + self.send_block_request(peer_id, request.clone()); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.", + peer_id, + request, + removed, + ) }, ChainSyncAction::CancelBlockRequest { peer_id } => { - self.pending_responses.remove(&peer_id); + let removed = self.pending_responses.remove(&peer_id); + + trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}."); }, ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => { self.pending_responses.remove(&peer_id); self.network_service .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(peer_id, rep); + + trace!(target: LOG_TARGET, "Processed {action:?}."); + }, + ChainSyncAction::ImportBlocks { origin, blocks } => { + let count = blocks.len(); + self.import_blocks(origin, blocks); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::ImportBlocks` with {count} blocks.", + ); + }, + ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => { + self.import_justifications(peer_id, hash, number, justifications); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).", + peer_id, + hash, + number, + ) }, - ChainSyncAction::ImportBlocks { origin, blocks } => self.import_blocks(origin, blocks), - ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => - self.import_justifications(peer_id, hash, number, justifications), }); } From affd3423df472a55b872674ca78c299699a31a36 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 9 Nov 2023 12:18:25 +0200 Subject: [PATCH 4/8] minor: comment --- substrate/client/network/sync/src/engine.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 99363ae472039..58a9fdc49f209 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -726,6 +726,7 @@ where ChainSyncAction::SendBlockRequest { peer_id, request } => { // Sending block request implies dropping obsolete pending response as we are not // interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]). + // Furthermore, only one request at a time is allowed to any peer. let removed = self.pending_responses.remove(&peer_id); self.send_block_request(peer_id, request.clone()); From b1f1fd0900afbd59ba481482c0a23b58b0230073 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 9 Nov 2023 16:32:51 +0200 Subject: [PATCH 5/8] Make no `ChainSync` methods return actions (except errors) --- .../client/network/sync/src/chain_sync.rs | 99 ++++++------------- 1 file changed, 32 insertions(+), 67 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index f7de98afa4296..245e8e69be7cb 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -191,31 +191,6 @@ struct ImportBlocksAction { pub blocks: Vec>, } -/// Result of [`ChainSync::on_block_data`]. -#[derive(Debug, Clone, PartialEq, Eq)] -enum OnBlockData { - /// The block should be imported. - Import(ImportBlocksAction), - /// A new block request needs to be made to the given peer. - Request(PeerId, BlockRequest), - /// Continue processing events. - Continue, -} - -/// Result of [`ChainSync::on_block_justification`]. -#[derive(Debug, Clone, PartialEq, Eq)] -enum OnBlockJustification { - /// The justification needs no further handling. - Nothing, - /// The justification should be imported. - Import { - peer_id: PeerId, - hash: Block::Hash, - number: NumberFor, - justifications: Justifications, - }, -} - // Result of [`ChainSync::on_state_data`]. #[derive(Debug)] enum OnStateData { @@ -716,7 +691,7 @@ where peer_id: &PeerId, request: Option>, response: BlockResponse, - ) -> Result, BadPeer> { + ) -> Result<(), BadPeer> { self.downloaded_blocks += response.blocks.len(); let mut gap = false; let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(peer_id) { @@ -881,10 +856,12 @@ where start: *start, state: next_state, }; - return Ok(OnBlockData::Request( - *peer_id, - ancestry_request::(next_num), - )) + let request = ancestry_request::(next_num); + self.actions.push(ChainSyncAction::SendBlockRequest { + peer_id: *peer_id, + request, + }); + return Ok(()) } else { // Ancestry search is complete. Check if peer is on a stale fork unknown // to us and add it to sync targets if necessary. @@ -929,8 +906,7 @@ where match warp_sync.import_target_block( blocks.pop().expect("`blocks` len checked above."), ) { - warp::TargetBlockImportResult::Success => - return Ok(OnBlockData::Continue), + warp::TargetBlockImportResult::Success => return Ok(()), warp::TargetBlockImportResult::BadResponse => return Err(BadPeer(*peer_id, rep::VERIFICATION_FAIL)), } @@ -952,7 +928,7 @@ where "Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.", peer_id, ); - return Ok(OnBlockData::Continue) + return Ok(()) } }, PeerSyncState::Available | @@ -989,7 +965,9 @@ where return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) }; - Ok(OnBlockData::Import(self.validate_and_queue_blocks(new_blocks, gap))) + self.validate_and_queue_blocks(new_blocks, gap); + + Ok(()) } /// Submit a justification response for processing. @@ -998,7 +976,7 @@ where &mut self, peer_id: PeerId, response: BlockResponse, - ) -> Result, BadPeer> { + ) -> Result<(), BadPeer> { let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { peer } else { @@ -1006,7 +984,7 @@ where target: LOG_TARGET, "💔 Called on_block_justification with a peer ID of an unknown peer", ); - return Ok(OnBlockJustification::Nothing) + return Ok(()) }; self.allowed_requests.add(&peer_id); @@ -1043,11 +1021,17 @@ where if let Some((peer_id, hash, number, justifications)) = self.extra_justifications.on_response(peer_id, justification) { - return Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) + self.actions.push(ChainSyncAction::ImportJustifications { + peer_id, + hash, + number, + justifications, + }); + return Ok(()) } } - Ok(OnBlockJustification::Nothing) + Ok(()) } /// Report a justification import (successful or not). @@ -1201,9 +1185,7 @@ where let blocks = self.ready_blocks(); if !blocks.is_empty() { - let ImportBlocksAction { origin, blocks } = - self.validate_and_queue_blocks(blocks, false); - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks }) + self.validate_and_queue_blocks(blocks, false); } } @@ -1251,11 +1233,7 @@ where } } - fn validate_and_queue_blocks( - &mut self, - mut new_blocks: Vec>, - gap: bool, - ) -> ImportBlocksAction { + fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec>, gap: bool) { let orig_len = new_blocks.len(); new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); if new_blocks.len() != orig_len { @@ -1287,7 +1265,7 @@ where } self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash)); - ImportBlocksAction { origin, blocks: new_blocks } + self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: new_blocks }) } fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor) { @@ -1554,27 +1532,14 @@ where blocks_range(), ); - if request.fields == BlockAttributes::JUSTIFICATION { - match self.on_block_justification(peer_id, block_response) { - Ok(OnBlockJustification::Nothing) => {}, - Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) => - self.actions.push(ChainSyncAction::ImportJustifications { - peer_id, - hash, - number, - justifications, - }), - Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), - } + let res = if request.fields == BlockAttributes::JUSTIFICATION { + self.on_block_justification(peer_id, block_response) } else { - match self.on_block_data(&peer_id, Some(request), block_response) { - Ok(OnBlockData::Import(ImportBlocksAction { origin, blocks })) => - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks }), - Ok(OnBlockData::Request(peer_id, request)) => - self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), - Ok(OnBlockData::Continue) => {}, - Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), - } + self.on_block_data(&peer_id, Some(request), block_response) + }; + + if let Err(bad_peer) = res { + self.actions.push(ChainSyncAction::DropPeer(bad_peer)); } } From 11f8d8784e42069b7dcfdf08411568a06badfda6 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 9 Nov 2023 16:42:17 +0200 Subject: [PATCH 6/8] Make `on_state_data` push actions directly to `self.actions` --- .../client/network/sync/src/chain_sync.rs | 31 +++++-------------- 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 245e8e69be7cb..ca11d9f666d20 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -184,22 +184,6 @@ struct GapSync { target: NumberFor, } -/// Action that the parent of [`ChainSync`] should perform if we want to import blocks. -#[derive(Debug, Clone, PartialEq, Eq)] -struct ImportBlocksAction { - pub origin: BlockOrigin, - pub blocks: Vec>, -} - -// Result of [`ChainSync::on_state_data`]. -#[derive(Debug)] -enum OnStateData { - /// The block and state that should be imported. - Import(BlockOrigin, IncomingBlock), - /// A new state request needs to be made to the given peer. - Continue, -} - /// Action that the parent of [`ChainSync`] should perform after reporting a network or block event. #[derive(Debug)] pub enum ChainSyncAction { @@ -1545,11 +1529,8 @@ where /// Submit a state received in a response. pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { - match self.on_state_data(&peer_id, response) { - Ok(OnStateData::Import(origin, block)) => - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] }), - Ok(OnStateData::Continue) => {}, - Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), + if let Err(bad_peer) = self.on_state_data(&peer_id, response) { + self.actions.push(ChainSyncAction::DropPeer(bad_peer)); } } @@ -1785,11 +1766,12 @@ where None } + #[must_use] fn on_state_data( &mut self, peer_id: &PeerId, response: OpaqueStateResponse, - ) -> Result, BadPeer> { + ) -> Result<(), BadPeer> { let response: Box = response.0.downcast().map_err(|_error| { error!( target: LOG_TARGET, @@ -1844,9 +1826,10 @@ where state: Some(state), }; debug!(target: LOG_TARGET, "State download is complete. Import is queued"); - Ok(OnStateData::Import(origin, block)) + self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] }); + Ok(()) }, - ImportResult::Continue => Ok(OnStateData::Continue), + ImportResult::Continue => Ok(()), ImportResult::BadResponse => { debug!(target: LOG_TARGET, "Bad state data received from {peer_id}"); Err(BadPeer(*peer_id, rep::BAD_BLOCK)) From fc41efd4051c72aa00f557478b156189c23641fc Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 9 Nov 2023 16:58:23 +0200 Subject: [PATCH 7/8] Make `restart` push actions directly to `self.actions` --- .../client/network/sync/src/chain_sync.rs | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index ca11d9f666d20..ee57711e36104 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -1300,7 +1300,7 @@ where /// Restart the sync process. This will reset all pending block requests and return an iterator /// of new block requests to make to peers. Peers that were downloading finality data (i.e. /// their state was `DownloadingJustification`) are unaffected and will stay in the same state. - fn restart(&mut self) -> impl Iterator> + '_ { + fn restart(&mut self) { self.blocks.clear(); if let Err(e) = self.reset_sync_start_point() { warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}"); @@ -1314,7 +1314,7 @@ where ); let old_peers = std::mem::take(&mut self.peers); - old_peers.into_iter().filter_map(move |(peer_id, mut p)| { + old_peers.into_iter().for_each(|(peer_id, mut p)| { // peers that were downloading justifications // should be kept in that state. if let PeerSyncState::DownloadingJustification(_) = p.state { @@ -1328,19 +1328,21 @@ where ); p.common_number = self.best_queued_number; self.peers.insert(peer_id, p); - return None + return } // handle peers that were in other states. - match self.new_peer_inner(peer_id, p.best_hash, p.best_number) { + let action = match self.new_peer_inner(peer_id, p.best_hash, p.best_number) { // since the request is not a justification, remove it from pending responses - Ok(None) => Some(ChainSyncAction::CancelBlockRequest { peer_id }), + Ok(None) => ChainSyncAction::CancelBlockRequest { peer_id }, // update the request if the new one is available - Ok(Some(request)) => Some(ChainSyncAction::SendBlockRequest { peer_id, request }), + Ok(Some(request)) => ChainSyncAction::SendBlockRequest { peer_id, request }, // this implies that we need to drop pending response from the peer - Err(bad_peer) => Some(ChainSyncAction::DropPeer(bad_peer)), - } - }) + Err(bad_peer) => ChainSyncAction::DropPeer(bad_peer), + }; + + self.actions.push(action); + }); } /// Find a block to start sync from. If we sync with state, that's the latest block we have @@ -1882,8 +1884,6 @@ where ) { trace!(target: LOG_TARGET, "Imported {imported} of {count}"); - let mut output = Vec::new(); - let mut has_error = false; for (_, hash) in &results { self.queue_blocks.remove(hash); @@ -1924,7 +1924,7 @@ where if aux.bad_justification { if let Some(ref peer) = peer_id { warn!("💔 Sent block with bad justification to import"); - output.push(ChainSyncAction::DropPeer(BadPeer( + self.actions.push(ChainSyncAction::DropPeer(BadPeer( *peer, rep::BAD_JUSTIFICATION, ))); @@ -1944,7 +1944,7 @@ where ); self.state_sync = None; self.mode = SyncMode::Full; - output.extend(self.restart()); + self.restart(); } let warp_sync_complete = self .warp_sync @@ -1958,7 +1958,7 @@ where ); self.warp_sync = None; self.mode = SyncMode::Full; - output.extend(self.restart()); + self.restart(); } let gap_sync_complete = self.gap_sync.as_ref().map_or(false, |s| s.target == number); @@ -1976,9 +1976,9 @@ where target: LOG_TARGET, "💔 Peer sent block with incomplete header to import", ); - output + self.actions .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); - output.extend(self.restart()); + self.restart(); }, Err(BlockImportError::VerificationFailed(peer_id, e)) => { let extra_message = peer_id @@ -1990,11 +1990,11 @@ where ); if let Some(peer) = peer_id { - output + self.actions .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); } - output.extend(self.restart()); + self.restart(); }, Err(BlockImportError::BadBlock(peer_id)) => if let Some(peer) = peer_id { @@ -2002,7 +2002,7 @@ where target: LOG_TARGET, "💔 Block {hash:?} received from peer {peer} has been blacklisted", ); - output.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); + self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); }, Err(BlockImportError::MissingState) => { // This may happen if the chain we were requesting upon has been discarded @@ -2014,15 +2014,13 @@ where warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); self.state_sync = None; self.warp_sync = None; - output.extend(self.restart()); + self.restart(); }, Err(BlockImportError::Cancelled) => {}, }; } self.allowed_requests.set_all(); - - self.actions.append(&mut output); } /// Get pending actions to perform. From cbee72af2a0ace40d1a40821162c46a47bc3bea8 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 10 Nov 2023 14:41:58 +0200 Subject: [PATCH 8/8] Update tests --- .../client/network/sync/src/chain_sync.rs | 2 +- .../network/sync/src/chain_sync/test.rs | 146 ++++++++++++------ 2 files changed, 104 insertions(+), 44 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index ee57711e36104..2adc6d4234159 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -879,7 +879,7 @@ where .insert(*peer_id); } peer.state = PeerSyncState::Available; - Vec::new() + return Ok(()) } }, PeerSyncState::DownloadingWarpTargetBlock => { diff --git a/substrate/client/network/sync/src/chain_sync/test.rs b/substrate/client/network/sync/src/chain_sync/test.rs index a769ff9119ab9..15b2a95a07c87 100644 --- a/substrate/client/network/sync/src/chain_sync/test.rs +++ b/substrate/client/network/sync/src/chain_sync/test.rs @@ -74,10 +74,8 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { // if the peer replies with an empty response (i.e. it doesn't know the block), // the active request should be cleared. - assert_eq!( - sync.on_block_justification(peer_id, BlockResponse:: { id: 0, blocks: vec![] }), - Ok(OnBlockJustification::Nothing), - ); + sync.on_block_justification(peer_id, BlockResponse:: { id: 0, blocks: vec![] }) + .unwrap(); // there should be no in-flight requests assert_eq!(sync.extra_justifications.active_requests().count(), 0); @@ -148,11 +146,16 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { PeerSyncState::DownloadingJustification(b1_hash), ); + // clear old actions + let _ = sync.take_actions(); + // we restart the sync state - let block_requests = sync.restart().collect::>(); + sync.restart(); + let actions = sync.take_actions().collect::>(); // which should make us send out block requests to the first two peers - assert!(block_requests.iter().all(|action| match action { + assert_eq!(actions.len(), 2); + assert!(actions.iter().all(|action| match action { ChainSyncAction::SendBlockRequest { peer_id, .. } => peer_id == &peer_id1 || peer_id == &peer_id2, _ => false, @@ -166,7 +169,7 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { // Set common block to something that we don't have (e.g. failed import) sync.peers.get_mut(&peer_id3).unwrap().common_number = 100; - let _ = sync.restart().count(); + sync.restart(); assert_eq!(sync.peers.get(&peer_id3).unwrap().common_number, 50); } @@ -299,11 +302,17 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + // Clear old actions to not deal with them + let _ = sync.take_actions(); + + sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == max_blocks_to_request as usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize, + )); best_block_num += max_blocks_to_request as u32; @@ -355,11 +364,14 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { assert_eq!(FromBlock::Number(best_block_num as u64), peer2_req.from); let response = create_block_response(vec![blocks[(best_block_num - 1) as usize].clone()]); - let res = sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap(); - assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() - ),); + + // Clear old actions to not deal with them + let _ = sync.take_actions(); + + sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + assert!(actions.is_empty()); let peer1_from = unwrap_from_block_number(peer1_req.unwrap().from); @@ -427,17 +439,27 @@ fn can_sync_huge_fork() { let mut request = get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); + // Discard old actions we are not interested in + let _ = sync.take_actions(); + // Do the ancestor search loop { let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; let response = create_block_response(vec![block.clone()]); - let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); - request = if let OnBlockData::Request(_peer, request) = on_block_data { - request - } else { + sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + + request = if actions.is_empty() { // We found the ancenstor break + } else { + assert_eq!(actions.len(), 1); + match &actions[0] { + ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + action @ _ => panic!("Unexpected action: {action:?}"), + } }; log::trace!(target: LOG_TARGET, "Request: {request:?}"); @@ -461,15 +483,18 @@ fn can_sync_huge_fork() { let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == sync.max_blocks_per_request as usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize + )); best_block_num += sync.max_blocks_per_request as u32; - let _ = sync.on_blocks_processed( + sync.on_blocks_processed( max_blocks_to_request as usize, max_blocks_to_request as usize, resp_blocks @@ -488,6 +513,9 @@ fn can_sync_huge_fork() { .collect(), ); + // Discard pending actions + let _ = sync.take_actions(); + resp_blocks .into_iter() .rev() @@ -544,17 +572,27 @@ fn syncs_fork_without_duplicate_requests() { let mut request = get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); + // Discard pending actions + let _ = sync.take_actions(); + // Do the ancestor search loop { let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; let response = create_block_response(vec![block.clone()]); - let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); - request = if let OnBlockData::Request(_peer, request) = on_block_data { - request - } else { + sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + + request = if actions.is_empty() { // We found the ancenstor break + } else { + assert_eq!(actions.len(), 1); + match &actions[0] { + ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + action @ _ => panic!("Unexpected action: {action:?}"), + } }; log::trace!(target: LOG_TARGET, "Request: {request:?}"); @@ -579,11 +617,17 @@ fn syncs_fork_without_duplicate_requests() { let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap(); + // Discard old actions + let _ = sync.take_actions(); + + sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == max_blocks_to_request as usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize + )); best_block_num += max_blocks_to_request as u32; @@ -761,8 +805,12 @@ fn sync_restart_removes_block_but_not_justification_requests() { ); assert_eq!(pending_responses.len(), 2); + // discard old actions + let _ = sync.take_actions(); + // restart sync - let actions = sync.restart().collect::>(); + sync.restart(); + let actions = sync.take_actions().collect::>(); for action in actions.iter() { match action { ChainSyncAction::CancelBlockRequest { peer_id } => { @@ -773,7 +821,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { // the `assert!` below pending_responses.remove(&peer_id); }, - _ => panic!("Unexpected action"), + action @ _ => panic!("Unexpected action: {action:?}"), } } assert!(actions.iter().any(|action| { @@ -858,11 +906,17 @@ fn request_across_forks() { let mut resp_blocks = fork_a_blocks[100_usize..107_usize].to_vec(); resp_blocks.reverse(); let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer, Some(request), response).unwrap(); + + // Drop old actions + let _ = sync.take_actions(); + + sync.on_block_data(&peer, Some(request), response).unwrap(); + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 7_usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize + )); assert_eq!(sync.best_queued_number, 107); assert_eq!(sync.best_queued_hash, block.hash()); assert!(sync.is_known(&block.header.parent_hash())); @@ -897,11 +951,17 @@ fn request_across_forks() { // block is announced. let request = get_block_request(&mut sync, FromBlock::Hash(block.hash()), 1, &peer); let response = create_block_response(vec![block.clone()]); - let res = sync.on_block_data(&peer, Some(request), response).unwrap(); + + // Drop old actions we are not going to check + let _ = sync.take_actions(); + + sync.on_block_data(&peer, Some(request), response).unwrap(); + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 1_usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize + )); assert!(sync.is_known(&block.header.parent_hash())); } }