diff --git a/core/consensus/src/adapter.rs b/core/consensus/src/adapter.rs index 6d0f07da6..28b141391 100644 --- a/core/consensus/src/adapter.rs +++ b/core/consensus/src/adapter.rs @@ -18,7 +18,7 @@ use core_network::{PeerId, PeerIdExt}; use protocol::traits::{ CommonConsensusAdapter, ConsensusAdapter, Context, ExecutorFactory, ExecutorParams, - ExecutorResp, Gossip, MemPool, MessageTarget, MixedTxHashes, PeerTrust, Priority, Rpc, + ExecutorResp, Gossip, MemPool, MessageTarget, MixedTxHashes, Network, PeerTrust, Priority, Rpc, ServiceMapping, Storage, SynchronizationAdapter, TrustFeedback, }; use protocol::types::{ @@ -45,7 +45,7 @@ const OVERLORD_GAP: usize = 10; pub struct OverlordConsensusAdapter< EF: ExecutorFactory, M: MemPool, - N: Rpc + PeerTrust + Gossip + 'static, + N: Rpc + PeerTrust + Gossip + Network + 'static, S: Storage, DB: cita_trie::DB, Mapping: ServiceMapping, @@ -68,7 +68,7 @@ impl ConsensusAdapter where EF: ExecutorFactory, M: MemPool + 'static, - N: Rpc + PeerTrust + Gossip + 'static, + N: Rpc + PeerTrust + Gossip + Network + 'static, S: Storage + 'static, DB: cita_trie::DB + 'static, Mapping: ServiceMapping + 'static, @@ -220,7 +220,7 @@ impl SynchronizationAdapter where EF: ExecutorFactory, M: MemPool + 'static, - N: Rpc + PeerTrust + Gossip + 'static, + N: Rpc + PeerTrust + Gossip + Network + 'static, S: Storage + 'static, DB: cita_trie::DB + 'static, Mapping: ServiceMapping + 'static, @@ -354,7 +354,7 @@ impl CommonConsensusAdapter where EF: ExecutorFactory, M: MemPool + 'static, - N: Rpc + PeerTrust + Gossip + 'static, + N: Rpc + PeerTrust + Gossip + Network + 'static, S: Storage + 'static, DB: cita_trie::DB + 'static, Mapping: ServiceMapping + 'static, @@ -492,6 +492,15 @@ where Ok(serde_json::from_str(&exec_resp.succeed_data).expect("Decode metadata failed!")) } + fn tag_consensus(&self, ctx: Context, pub_keys: Vec) -> ProtocolResult<()> { + let peer_ids_bytes = pub_keys + .iter() + .map(|pk| PeerId::from_pubkey_bytes(pk).map(PeerIdExt::into_bytes_ext)) + .collect::>()?; + + self.network.tag_consensus(ctx, peer_ids_bytes) + } + #[muta_apm::derive::tracing_span(kind = "consensus.adapter")] fn report_bad(&self, ctx: Context, feedback: TrustFeedback) { self.network.report(ctx, feedback); @@ -808,7 +817,7 @@ impl OverlordConsensusAdapter, M: MemPool + 'static, - N: Rpc + PeerTrust + Gossip + 'static, + N: Rpc + PeerTrust + Gossip + Network + 'static, S: Storage + 'static, DB: cita_trie::DB + 'static, Mapping: ServiceMapping + 'static, diff --git a/core/consensus/src/synchronization.rs b/core/consensus/src/synchronization.rs index 1ed8afe06..fde66bb21 100644 --- a/core/consensus/src/synchronization.rs +++ b/core/consensus/src/synchronization.rs @@ -338,6 +338,13 @@ impl OverlordSynchronization { metadata.max_tx_size, ); + let pub_keys = metadata + .verifier_list + .iter() + .map(|v| v.pub_key.decode()) + .collect(); + self.adapter.tag_consensus(ctx.clone(), pub_keys)?; + log::info!( "[synchronization]: commit_block, committing block header: {}, committing proof:{:?}", block.header.clone(), diff --git a/core/consensus/src/tests/synchronization.rs b/core/consensus/src/tests/synchronization.rs index 4adce32a2..e0740e869 100644 --- a/core/consensus/src/tests/synchronization.rs +++ b/core/consensus/src/tests/synchronization.rs @@ -300,6 +300,10 @@ impl CommonConsensusAdapter for MockCommonConsensusAdapter { }) } + fn tag_consensus(&self, _: Context, _: Vec) -> ProtocolResult<()> { + Ok(()) + } + fn report_bad(&self, _ctx: Context, _feedback: TrustFeedback) {} fn set_args( diff --git a/core/network/src/peer_manager/mod.rs b/core/network/src/peer_manager/mod.rs index b413681b2..79be20f69 100644 --- a/core/network/src/peer_manager/mod.rs +++ b/core/network/src/peer_manager/mod.rs @@ -270,8 +270,9 @@ impl Deref for ArcSession { } struct Inner { - sessions: RwLock>, - peers: RwLock>, + consensus: RwLock>, + sessions: RwLock>, + peers: RwLock>, listen: RwLock>, } @@ -279,8 +280,9 @@ struct Inner { impl Inner { pub fn new() -> Self { Inner { - sessions: Default::default(), - peers: Default::default(), + consensus: Default::default(), + sessions: Default::default(), + peers: Default::default(), listen: Default::default(), } @@ -427,6 +429,53 @@ impl PeerManagerHandle { listen.into_iter().map(sanitize).collect() } + + pub fn tag(&self, peer_id: &PeerId, tag: PeerTag) -> Result<(), NetworkError> { + let consensus_tag = tag == PeerTag::Consensus; + + if let Some(peer) = self.inner.peer(peer_id) { + peer.tags.insert(tag)?; + } else { + let peer = ArcPeer::new(peer_id.to_owned()); + peer.tags.insert(tag)?; + self.inner.add_peer(peer); + } + + if consensus_tag { + self.inner.consensus.write().insert(peer_id.to_owned()); + } + + Ok(()) + } + + pub fn untag(&self, peer_id: &PeerId, tag: &PeerTag) { + if let Some(peer) = self.inner.peer(peer_id) { + peer.tags.remove(tag); + } + + if tag == &PeerTag::Consensus { + self.inner.consensus.write().remove(peer_id); + } + } + + pub fn tag_consensus(&self, peer_ids: Vec) { + { + for peer_id in self.inner.consensus.read().iter() { + if let Some(peer) = self.inner.peer(peer_id) { + peer.tags.remove(&PeerTag::Consensus) + } + } + } + + for peer_id in peer_ids.iter() { + let _ = self.tag(peer_id, PeerTag::Consensus); + } + + { + let id_set = HashSet::from_iter(peer_ids); + *self.inner.consensus.write() = id_set; + } + } } pub struct PeerManager { diff --git a/core/network/src/peer_manager/test_manager.rs b/core/network/src/peer_manager/test_manager.rs index ac7bae94a..07aa76699 100644 --- a/core/network/src/peer_manager/test_manager.rs +++ b/core/network/src/peer_manager/test_manager.rs @@ -2739,3 +2739,49 @@ async fn should_setup_trust_metric_if_none_on_session_blocked() { "should have 1 bad event" ); } + +#[tokio::test] +async fn should_able_to_tag_peer() { + let (mgr, _conn_rx) = make_manager(0, 20); + let handle = mgr.inner.handle(); + + let peer = make_peer(2077); + handle.tag(&peer.id, PeerTag::Consensus).unwrap(); + + let peer = mgr.core_inner().peer(&peer.id).unwrap(); + assert!(peer.tags.contains(&PeerTag::Consensus)); +} + +#[tokio::test] +async fn should_able_to_untag_peer() { + let (mgr, _conn_rx) = make_manager(0, 20); + let handle = mgr.inner.handle(); + + let peer = make_peer(2077); + handle.tag(&peer.id, PeerTag::Consensus).unwrap(); + + let peer = mgr.core_inner().peer(&peer.id).unwrap(); + assert!(peer.tags.contains(&PeerTag::Consensus)); + + handle.untag(&peer.id, &PeerTag::Consensus); + assert!(!peer.tags.contains(&PeerTag::Consensus)); +} + +#[tokio::test] +async fn should_remove_old_consensus_peer_tag_when_tag_consensus() { + let (mgr, _conn_rx) = make_manager(0, 20); + let handle = mgr.inner.handle(); + + let peer = make_peer(2077); + handle.tag(&peer.id, PeerTag::Consensus).unwrap(); + + let peer = mgr.core_inner().peer(&peer.id).unwrap(); + assert!(peer.tags.contains(&PeerTag::Consensus)); + + let new_consensus = make_peer(3077); + handle.tag_consensus(vec![new_consensus.owned_id()]); + + let new_consensus = mgr.core_inner().peer(&new_consensus.id).unwrap(); + assert!(new_consensus.tags.contains(&PeerTag::Consensus)); + assert!(!peer.tags.contains(&PeerTag::Consensus)); +} diff --git a/core/network/src/service.rs b/core/network/src/service.rs index db479a3d7..1c2a31a7e 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -16,7 +16,8 @@ use futures::{ use log::{debug, error, info}; use protocol::{ traits::{ - Context, Gossip, MessageCodec, MessageHandler, PeerTrust, Priority, Rpc, TrustFeedback, + Context, Gossip, MessageCodec, MessageHandler, Network, PeerTag, PeerTrust, Priority, Rpc, + TrustFeedback, }, Bytes, ProtocolResult, }; @@ -36,13 +37,13 @@ use crate::{ message::RawSessionMessage, metrics::Metrics, outbound::{NetworkGossip, NetworkRpc}, - peer_manager::{PeerManager, PeerManagerConfig, SharedSessions}, + peer_manager::{PeerManager, PeerManagerConfig, PeerManagerHandle, SharedSessions}, protocols::CoreProtocol, reactor::{MessageRouter, Reactor}, rpc_map::RpcMap, selfcheck::SelfCheck, traits::NetworkContext, - NetworkConfig, + NetworkConfig, PeerIdExt, }; #[derive(Clone)] @@ -50,6 +51,7 @@ pub struct NetworkServiceHandle { gossip: NetworkGossip, Snappy>, rpc: NetworkRpc, Snappy>, peer_trust: UnboundedSender, + peer_state: PeerManagerHandle, #[cfg(feature = "diagnostic")] pub diagnostic: Diagnostic, @@ -128,6 +130,32 @@ impl PeerTrust for NetworkServiceHandle { } } +impl Network for NetworkServiceHandle { + fn tag(&self, _: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()> { + let peer_id = ::from_bytes(peer_id)?; + self.peer_state.tag(&peer_id, tag)?; + + Ok(()) + } + + fn untag(&self, _: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()> { + let peer_id = ::from_bytes(peer_id)?; + self.peer_state.untag(&peer_id, tag); + + Ok(()) + } + + fn tag_consensus(&self, _: Context, peer_ids: Vec) -> ProtocolResult<()> { + let peer_ids = peer_ids + .into_iter() + .map(::from_bytes) + .collect::, _>>()?; + self.peer_state.tag_consensus(peer_ids); + + Ok(()) + } +} + enum NetworkConnectionService { NoListen(ConnectionService), // no listen address yet Ready(ConnectionService), @@ -152,9 +180,10 @@ pub struct NetworkService { rpc_map: Arc, // Core service - net_conn_srv: Option, - peer_mgr: Option, - router: Option>, + net_conn_srv: Option, + peer_mgr: Option, + peer_mgr_handle: PeerManagerHandle, + router: Option>, // Metrics metrics: Option>, @@ -204,7 +233,7 @@ impl NetworkService { let proto = CoreProtocol::build() .ping(config.ping_interval, config.ping_timeout, mgr_tx.clone()) .identify(peer_mgr_handle.clone(), mgr_tx.clone()) - .discovery(peer_mgr_handle, mgr_tx.clone(), disc_sync_interval) + .discovery(peer_mgr_handle.clone(), mgr_tx.clone(), disc_sync_interval) .transmitter(raw_msg_tx.clone()) .build(); @@ -249,6 +278,7 @@ impl NetworkService { net_conn_srv: Some(NetworkConnectionService::NoListen(conn_srv)), peer_mgr: Some(peer_mgr), + peer_mgr_handle, router: Some(router), metrics: Some(metrics), @@ -322,6 +352,7 @@ impl NetworkService { gossip: self.gossip.clone(), rpc: self.rpc.clone(), peer_trust: self.mgr_tx.clone(), + peer_state: self.peer_mgr_handle.clone(), #[cfg(feature = "diagnostic")] diagnostic: self.diagnostic.clone(), diff --git a/protocol/src/traits/consensus.rs b/protocol/src/traits/consensus.rs index 96cc33901..553f55258 100644 --- a/protocol/src/traits/consensus.rs +++ b/protocol/src/traits/consensus.rs @@ -128,6 +128,8 @@ pub trait CommonConsensusAdapter: Send + Sync { proposer: Address, ) -> ProtocolResult; + fn tag_consensus(&self, ctx: Context, peer_ids: Vec) -> ProtocolResult<()>; + fn report_bad(&self, ctx: Context, feedback: TrustFeedback); fn set_args(&self, context: Context, timeout_gap: u64, cycles_limit: u64, max_tx_size: u64); diff --git a/protocol/src/traits/network.rs b/protocol/src/traits/network.rs index 0dfedf9d6..396025cfa 100644 --- a/protocol/src/traits/network.rs +++ b/protocol/src/traits/network.rs @@ -157,8 +157,9 @@ pub trait Rpc: Send + Sync { } pub trait Network: Send + Sync { - fn add_tag(&self, ctx: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()>; - fn remove_tag(&self, ctx: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()>; + fn tag(&self, ctx: Context, peer_id: Bytes, tag: PeerTag) -> ProtocolResult<()>; + fn untag(&self, ctx: Context, peer_id: Bytes, tag: &PeerTag) -> ProtocolResult<()>; + fn tag_consensus(&self, ctx: Context, peer_ids: Vec) -> ProtocolResult<()>; } pub trait PeerTrust: Send + Sync { diff --git a/src/default_start.rs b/src/default_start.rs index 9d91faa18..4b1e2ed26 100644 --- a/src/default_start.rs +++ b/src/default_start.rs @@ -38,7 +38,7 @@ use core_network::{NetworkConfig, NetworkService}; use core_storage::{adapter::rocks::RocksAdapter, ImplStorage, StorageError}; use framework::binding::state::RocksTrieDB; use framework::executor::{ServiceExecutor, ServiceExecutorFactory}; -use protocol::traits::{APIAdapter, Context, MemPool, NodeInfo, ServiceMapping, Storage}; +use protocol::traits::{APIAdapter, Context, MemPool, Network, NodeInfo, ServiceMapping, Storage}; use protocol::types::{Address, Block, BlockHeader, Genesis, Hash, Metadata, Proof, Validator}; use protocol::{fixed_codec::FixedCodec, ProtocolResult}; @@ -399,6 +399,16 @@ pub async fn start( lock, )); + let pub_keys = metadata + .verifier_list + .iter() + .map(|v| v.pub_key.decode()) + .collect(); + + network_service + .handle() + .tag_consensus(Context::new(), pub_keys)?; + // Re-execute block from exec_height + 1 to current_height, so that init the // lost current status. log::info!("Re-execute from {} to {}", exec_height + 1, current_height); diff --git a/tests/trust_metric_all/node/full_node/default_start.rs b/tests/trust_metric_all/node/full_node/default_start.rs index 296911ff2..b6f6b20cf 100644 --- a/tests/trust_metric_all/node/full_node/default_start.rs +++ b/tests/trust_metric_all/node/full_node/default_start.rs @@ -40,7 +40,7 @@ use core_mempool::{ use core_network::{DiagnosticEvent, NetworkConfig, NetworkService}; use core_storage::{ImplStorage, StorageError}; use framework::executor::{ServiceExecutor, ServiceExecutorFactory}; -use protocol::traits::{APIAdapter, Context, MemPool, NodeInfo, ServiceMapping, Storage}; +use protocol::traits::{APIAdapter, Context, MemPool, Network, NodeInfo, ServiceMapping, Storage}; use protocol::types::{Address, Block, BlockHeader, Genesis, Hash, Metadata, Proof, Validator}; use protocol::{fixed_codec::FixedCodec, ProtocolResult}; @@ -371,6 +371,16 @@ pub async fn start( lock, )); + let pub_keys = metadata + .verifier_list + .iter() + .map(|v| v.pub_key.decode()) + .collect(); + + network_service + .handle() + .tag_consensus(Context::new(), pub_keys)?; + // Re-execute block from exec_height + 1 to current_height, so that init the // lost current status. log::info!("Re-execute from {} to {}", exec_height + 1, current_height);