diff --git a/chain/network/src/lib.rs b/chain/network/src/lib.rs index 9ae68ed86c9..5683ed143cf 100644 --- a/chain/network/src/lib.rs +++ b/chain/network/src/lib.rs @@ -1,22 +1,22 @@ pub use crate::peer_manager::peer_manager_actor::{Event, PeerManagerActor}; pub use crate::peer_manager::peer_store::iter_peers_from_store; -#[cfg(feature = "test_features")] -pub use crate::stats::metrics::RECEIVED_INFO_ABOUT_ITSELF; mod accounts_data; mod concurrency; mod network_protocol; mod peer; mod peer_manager; +mod private_actix; +mod stats; +mod store; pub mod actix; pub mod blacklist; pub mod config; pub mod config_json; -pub(crate) mod private_actix; pub mod routing; -pub(crate) mod stats; -pub(crate) mod store; + +pub mod tcp; pub mod test_utils; pub mod time; pub mod types; diff --git a/chain/network/src/network_protocol/edge.rs b/chain/network/src/network_protocol/edge.rs index 59444b3061c..545e7bb26b3 100644 --- a/chain/network/src/network_protocol/edge.rs +++ b/chain/network/src/network_protocol/edge.rs @@ -25,12 +25,7 @@ pub struct PartialEdgeInfo { impl PartialEdgeInfo { pub fn new(peer0: &PeerId, peer1: &PeerId, nonce: u64, secret_key: &SecretKey) -> Self { - let data = if peer0 < peer1 { - Edge::build_hash(peer0, peer1, nonce) - } else { - Edge::build_hash(peer1, peer0, nonce) - }; - + let data = Edge::build_hash(peer0, peer1, nonce); let signature = secret_key.sign(data.as_ref()); Self { nonce, signature } } @@ -114,7 +109,7 @@ impl Edge { /// Build the hash of the edge given its content. /// It is important that peer0 < peer1 at this point. pub fn build_hash(peer0: &PeerId, peer1: &PeerId, nonce: u64) -> CryptoHash { - debug_assert!(peer0 < peer1); + let (peer0, peer1) = if peer0 < peer1 { (peer0, peer1) } else { (peer1, peer0) }; CryptoHash::hash_borsh(&(peer0, peer1, nonce)) } @@ -130,11 +125,7 @@ impl Edge { /// to verify the signature. pub fn partial_verify(peer0: &PeerId, peer1: &PeerId, edge_info: &PartialEdgeInfo) -> bool { let pk = peer1.public_key(); - let data = if peer0 < peer1 { - Edge::build_hash(peer0, peer1, edge_info.nonce) - } else { - Edge::build_hash(peer1, peer0, edge_info.nonce) - }; + let data = Edge::build_hash(peer0, peer1, edge_info.nonce); edge_info.signature.verify(data.as_ref(), pk) } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 815a0c971da..a5b3f7a55f8 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -10,30 +10,28 @@ use crate::peer::tracker::Tracker; use crate::peer_manager::connection; use crate::peer_manager::network_state::NetworkState; use crate::peer_manager::peer_manager_actor::Event; -use crate::private_actix::PeersResponse; -use crate::private_actix::{PeerToManagerMsg, PeerToManagerMsgResp}; use crate::private_actix::{ - PeersRequest, RegisterPeer, RegisterPeerResponse, SendMessage, Unregister, + PeerToManagerMsg, PeerToManagerMsgResp, PeersRequest, PeersResponse, RegisterPeer, + RegisterPeerError, RegisterPeerResponse, SendMessage, Unregister, }; use crate::routing::edge::verify_nonce; -use crate::sink::Sink; use crate::stats::metrics; +use crate::tcp; use crate::time; use crate::types::{ Ban, Handshake, HandshakeFailureReason, NetworkClientMessages, NetworkClientResponses, NetworkViewClientMessages, NetworkViewClientResponses, PeerIdOrHash, PeerManagerRequest, PeerManagerRequestWithContext, PeerMessage, PeerType, ReasonForBan, StateResponseInfo, }; +use near_o11y::log_assert; use actix::{ Actor, ActorContext, ActorFutureExt, AsyncContext, Context, ContextFutureSpawner, Handler, Running, WrapFuture, }; -use anyhow::Context as _; use lru::LruCache; use near_crypto::Signature; use near_performance_metrics_macros::perf; -use near_primitives::block::GenesisId; use near_primitives::logging; use near_primitives::network::PeerId; use near_primitives::utils::DisplayOption; @@ -63,24 +61,42 @@ const ROUTED_MESSAGE_CACHE_SIZE: usize = 1000; /// Duplicated messages will be dropped if routed through the same peer multiple times. const DROP_DUPLICATED_MESSAGES_PERIOD: time::Duration = time::Duration::milliseconds(50); -// A guard which reports PeerActorStopped event when dropped. -// Ideally it should rather wrap TcpStream somehow, however the stream -// itself is being split into read/write ends and wrapped, so it -// is not exactly clear how it would work. Instead we just keep it -// as a separate field of PeerActor. -// -// TODO(gprusak): rename PeerActorStopped to ConnectionClosed: -// TCP connection can be closed even before the PeerActor is started, -// and we want to report that. -struct ConnectionGuard { - peer_addr: SocketAddr, - event_sink: Sink, +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ConnectionClosedEvent { + pub(crate) stream_id: tcp::StreamId, + pub(crate) reason: ClosingReason, } -impl Drop for ConnectionGuard { - fn drop(&mut self) { - self.event_sink.push(Event::ConnectionClosed(self.peer_addr.clone())); - } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HandshakeStartedEvent { + pub(crate) stream_id: tcp::StreamId, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HandshakeCompletedEvent { + pub(crate) stream_id: tcp::StreamId, + pub(crate) edge: Edge, +} + +#[derive(thiserror::Error, Clone, PartialEq, Eq, Debug)] +pub(crate) enum ClosingReason { + #[error("too many inbound connections in connecting state")] + TooManyInbound, + #[error("outbound not allowed: {0}")] + OutboundNotAllowed(connection::PoolError), + + #[error("peer banned: {0:?}")] + Ban(ReasonForBan), + #[error("handshake failed")] + HandshakeFailed, + #[error("rejected by PeerManager: {0:?}")] + RejectedByPeerManager(RegisterPeerError), + #[error("stream error")] + StreamError, + #[error("PeerManager requested to close the connection")] + PeerManager, + #[error("Received DisconnectMessage from peer")] + DisconnectMessage, } pub(crate) struct PeerActor { @@ -91,6 +107,8 @@ pub(crate) struct PeerActor { /// This node's id and address (either listening or socket address). my_node_info: PeerInfo, + /// TEST-ONLY + stream_id: crate::tcp::StreamId, /// Peer address from connection. peer_addr: SocketAddr, /// Peer type. @@ -116,6 +134,8 @@ pub(crate) struct PeerActor { /// Peer status. peer_status: PeerStatus, + closing_reason: Option, + /// Peer id and info. Present when Ready, /// or (for outbound only) when Connecting. // TODO: move it to ConnectingStatus::Outbound. @@ -123,7 +143,6 @@ pub(crate) struct PeerActor { peer_info: DisplayOption, /// Shared state of the connection. Present when ready. connection: Option>, - _connection_guard: ConnectionGuard, } impl Debug for PeerActor { @@ -132,17 +151,10 @@ impl Debug for PeerActor { } } -#[derive(Debug)] -pub(crate) enum StreamConfig { - Inbound, - Outbound { peer_id: PeerId }, -} - #[derive(Clone, Debug)] struct HandshakeSpec { /// ID of the peer on the other side of the connection. peer_id: PeerId, - genesis_id: GenesisId, protocol_version: ProtocolVersion, partial_edge_info: PartialEdgeInfo, } @@ -150,36 +162,45 @@ struct HandshakeSpec { impl PeerActor { pub(crate) fn spawn( clock: time::Clock, - stream: tokio::net::TcpStream, - stream_config: StreamConfig, + stream: tcp::Stream, force_encoding: Option, network_state: Arc, ) -> anyhow::Result> { - let peer_addr = stream.peer_addr().context("stream.peer_addr()")?; - // WARNING: connection guard is reported AFTER peer_addr is resolved, - // so if resolving fails, Event::ConnectionClosed won't be emitted. - let connection_guard = ConnectionGuard { - event_sink: network_state.config.event_sink.clone(), - peer_addr: peer_addr.clone(), - }; - let connecting_status = match &stream_config { - StreamConfig::Inbound => ConnectingStatus::Inbound( + let stream_id = stream.id(); + match Self::spawn_inner(clock, stream, force_encoding, network_state.clone()) { + Ok(it) => Ok(it), + Err(reason) => { + network_state.config.event_sink.push(Event::ConnectionClosed( + ConnectionClosedEvent { stream_id, reason: reason.clone() }, + )); + Err(reason.into()) + } + } + } + + fn spawn_inner( + clock: time::Clock, + stream: tcp::Stream, + force_encoding: Option, + network_state: Arc, + ) -> Result, ClosingReason> { + let connecting_status = match &stream.type_ { + crate::tcp::StreamType::Inbound => ConnectingStatus::Inbound( network_state .inbound_handshake_permits .clone() .try_acquire_owned() - .context("too many connections in Connecting state")?, + .map_err(|_| ClosingReason::TooManyInbound)?, ), - StreamConfig::Outbound { peer_id } => ConnectingStatus::Outbound { + crate::tcp::StreamType::Outbound { peer_id } => ConnectingStatus::Outbound { _permit: network_state .tier2 .start_outbound(peer_id.clone()) - .context("tier2.start_outbound()")?, + .map_err(ClosingReason::OutboundNotAllowed)?, handshake_spec: HandshakeSpec { partial_edge_info: network_state.propose_edge(peer_id, None), protocol_version: PROTOCOL_VERSION, peer_id: peer_id.clone(), - genesis_id: network_state.genesis_id.clone(), }, }, }; @@ -192,14 +213,19 @@ impl PeerActor { // Start PeerActor on separate thread. Ok(Self::start_in_arbiter(&actix::Arbiter::new().handle(), move |ctx| { let stats = Arc::new(connection::Stats::default()); - let framed = stream::FramedStream::spawn(ctx, peer_addr, stream, stats.clone()); + let stream_id = stream.id(); + let peer_addr = stream.peer_addr; + let stream_type = stream.type_.clone(); + let framed = stream::FramedStream::spawn(ctx, stream, stats.clone()); Self { + closing_reason: None, clock, my_node_info, + stream_id, peer_addr, - peer_type: match &stream_config { - StreamConfig::Inbound => PeerType::Inbound, - StreamConfig::Outbound { .. } => PeerType::Outbound, + peer_type: match &stream_type { + tcp::StreamType::Inbound => PeerType::Inbound, + tcp::StreamType::Outbound { .. } => PeerType::Outbound, }, peer_status: PeerStatus::Connecting(connecting_status), framed, @@ -208,18 +234,17 @@ impl PeerActor { routed_message_cache: LruCache::new(ROUTED_MESSAGE_CACHE_SIZE), protocol_buffers_supported: false, force_encoding, - peer_info: match &stream_config { - StreamConfig::Inbound => None, - StreamConfig::Outbound { peer_id } => Some(PeerInfo { + peer_info: match &stream_type { + tcp::StreamType::Inbound => None, + tcp::StreamType::Outbound { peer_id } => Some(PeerInfo { id: peer_id.clone(), - addr: Some(peer_addr.clone()), + addr: Some(peer_addr), account_id: None, }), } .into(), network_state, connection: None, - _connection_guard: connection_guard, } })) } @@ -299,28 +324,29 @@ impl PeerActor { fn send_handshake(&self, spec: HandshakeSpec) { let chain_info = self.network_state.chain_info.load(); - let msg = Handshake { + let handshake = Handshake { protocol_version: spec.protocol_version, oldest_supported_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION, - sender_peer_id: self.my_node_id().clone(), - target_peer_id: spec.peer_id.clone(), - sender_listen_port: self.my_node_info.addr_port(), + sender_peer_id: self.network_state.config.node_id(), + target_peer_id: spec.peer_id, + sender_listen_port: self.network_state.config.node_addr.map(|a| a.port()), sender_chain_info: PeerChainInfoV2 { - genesis_id: spec.genesis_id, + genesis_id: self.network_state.genesis_id.clone(), height: chain_info.height, tracked_shards: chain_info.tracked_shards.clone(), archival: self.network_state.config.archive, }, partial_edge_info: spec.partial_edge_info, }; - let msg = PeerMessage::Handshake(msg); + let msg = PeerMessage::Handshake(handshake); self.send_message_or_log(&msg); } - fn ban_peer(&mut self, ctx: &mut Context, ban_reason: ReasonForBan) { - warn!(target: "network", "Banning peer {} for {:?}", self.peer_info, ban_reason); - self.peer_status = PeerStatus::Banned(ban_reason); - // On stopping Banned signal will be sent to PeerManager + fn stop(&mut self, ctx: &mut Context, reason: ClosingReason) { + // Only the first call to stop sets the closing_reason. + if self.closing_reason.is_none() { + self.closing_reason = Some(reason); + } ctx.stop(); } @@ -590,7 +616,7 @@ impl PeerActor { // TODO: count as malicious behavior? } Ok(NetworkClientResponses::Ban { ban_reason }) => { - act.ban_peer(ctx, ban_reason); + act.stop(ctx, ClosingReason::Ban(ban_reason)); } Err(err) => { error!( @@ -655,22 +681,22 @@ impl PeerActor { ConnectingStatus::Outbound { handshake_spec: spec, .. } => { if handshake.protocol_version != spec.protocol_version { warn!(target: "network", "Protocol version mismatch. Disconnecting peer {}", handshake.sender_peer_id); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); return; } - if handshake.sender_chain_info.genesis_id != spec.genesis_id { + if handshake.sender_chain_info.genesis_id != self.network_state.genesis_id { warn!(target: "network", "Genesis mismatch. Disconnecting peer {}", handshake.sender_peer_id); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); return; } if handshake.sender_peer_id != spec.peer_id { warn!(target: "network", "PeerId mismatch. Disconnecting peer {}", handshake.sender_peer_id); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); return; } if handshake.partial_edge_info.nonce != spec.partial_edge_info.nonce { warn!(target: "network", "Nonce mismatch. Disconnecting peer {}", handshake.sender_peer_id); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); return; } } @@ -711,7 +737,7 @@ impl PeerActor { // Verify if nonce is sane. if let Err(err) = verify_nonce(&self.clock, handshake.partial_edge_info.nonce) { debug!(target: "network", nonce=?handshake.partial_edge_info.nonce, my_node_id = ?self.my_node_id(), peer_id=?handshake.sender_peer_id, "bad nonce, disconnecting: {err}"); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); return; } // Check that the received nonce is greater than the current nonce of this connection. @@ -728,13 +754,6 @@ impl PeerActor { } } - if handshake.sender_peer_id == self.my_node_info.id { - metrics::RECEIVED_INFO_ABOUT_ITSELF.inc(); - debug!(target: "network", "Received info about itself. Disconnecting this peer."); - ctx.stop(); - return; - } - // Verify that the received partial edge is valid. // WARNING: signature is verified against the 2nd argument. if !Edge::partial_verify( @@ -743,8 +762,7 @@ impl PeerActor { &handshake.partial_edge_info, ) { warn!(target: "network", "partial edge with invalid signature, disconnecting"); - self.ban_peer(ctx, ReasonForBan::InvalidSignature); - ctx.stop(); + self.stop(ctx, ClosingReason::Ban(ReasonForBan::InvalidSignature)); return; } @@ -767,6 +785,11 @@ impl PeerActor { ); debug_assert!(edge.verify()); + // TODO(gprusak): not enabling a port for listening is also a valid setup. + // In that case peer_info.addr should be None (same as now), however + // we still should do the check against the PeerStore::blacklist. + // Currently PeerManager is rejecting connections with peer_info.addr == None + // preemptively. let peer_info = PeerInfo { id: handshake.sender_peer_id.clone(), addr: handshake @@ -852,7 +875,6 @@ impl PeerActor { if act.peer_type == PeerType::Inbound { act.send_handshake(HandshakeSpec{ peer_id: handshake.sender_peer_id.clone(), - genesis_id: act.network_state.genesis_id.clone(), protocol_version: handshake.protocol_version, partial_edge_info: partial_edge_info, }); @@ -865,14 +887,22 @@ impl PeerActor { requesting_full_sync: true, })); } - actix::fut::ready(()) + act.network_state.config.event_sink.push(Event::HandshakeCompleted(HandshakeCompletedEvent{ + stream_id: act.stream_id, + edge: connection.edge.clone(), + })); }, - err => { + Ok(RegisterPeerResponse::Reject(err)) => { + info!(target: "network", "{:?}: Connection with {:?} rejected by PeerManager: {:?}", act.my_node_id(),connection.peer_info.id,err); + act.stop(ctx,ClosingReason::RejectedByPeerManager(err)); + } + Err(err) => { + // TODO(gprusak): this shouldn't happen at all. info!(target: "network", "{:?}: Peer with handshake {:?} wasn't consolidated, disconnecting: {err:?}", act.my_node_id(), handshake); - ctx.stop(); - actix::fut::ready(()) + act.stop(ctx,ClosingReason::HandshakeFailed); } - } + }; + actix::fut::ready(()) }) .wait(ctx); } @@ -892,7 +922,7 @@ impl Actor for PeerActor { move |act, ctx| match &act.peer_status { PeerStatus::Connecting { .. } => { info!(target: "network", "Handshake timeout expired for {}", act.peer_info); - ctx.stop(); + act.stop(ctx, ClosingReason::HandshakeFailed); } _ => {} }, @@ -904,14 +934,17 @@ impl Actor for PeerActor { { self.send_handshake(handshake_spec.clone()); } - self.network_state.config.event_sink.push(Event::PeerActorStarted(self.peer_addr)); + self.network_state + .config + .event_sink + .push(Event::HandshakeStarted(HandshakeStartedEvent { stream_id: self.stream_id })); } fn stopping(&mut self, _: &mut Self::Context) -> Running { metrics::PEER_CONNECTIONS_TOTAL.dec(); debug!(target: "network", "{:?}: [status = {:?}] Peer {} disconnected.", self.my_node_info.id, self.peer_status, self.peer_info); if let Some(peer_info) = self.peer_info.as_ref() { - if let PeerStatus::Banned(ban_reason) = self.peer_status { + if let Some(ClosingReason::Ban(ban_reason)) = self.closing_reason { let _ = self.network_state.peer_manager_addr.do_send(PeerToManagerMsg::Ban(Ban { peer_id: peer_info.id.clone(), ban_reason, @@ -939,6 +972,13 @@ impl Actor for PeerActor { } fn stopped(&mut self, _ctx: &mut Self::Context) { + // closing_reason may be None in case the whole actix system is stopped. + // It happens a lot in tests. + if let Some(reason) = self.closing_reason.take() { + self.network_state.config.event_sink.push(Event::ConnectionClosed( + ConnectionClosedEvent { stream_id: self.stream_id, reason }, + )); + } actix::Arbiter::current().stop(); } } @@ -948,7 +988,7 @@ impl actix::Handler for PeerActor { fn handle(&mut self, err: stream::Error, ctx: &mut Self::Context) { let expected = match &err { stream::Error::Recv(stream::RecvError::MessageTooLarge { .. }) => { - self.ban_peer(ctx, ReasonForBan::Abusive); + self.stop(ctx, ClosingReason::Ban(ReasonForBan::Abusive)); true } // It is expected in a sense that the peer might be just slow. @@ -956,19 +996,21 @@ impl actix::Handler for PeerActor { stream::Error::Recv(stream::RecvError::IO(err)) | stream::Error::Send(stream::SendError::IO(err)) => match err.kind() { // Connection has been closed. - io::ErrorKind::UnexpectedEof | io::ErrorKind::ConnectionReset => true, + io::ErrorKind::UnexpectedEof + | io::ErrorKind::ConnectionReset + | io::ErrorKind::BrokenPipe => true, + // When stopping tokio runtime, an "IO driver has terminated" is sometimes + // returned. + io::ErrorKind::Other => true, // It is unexpected in a sense that stream got broken in an unexpected way. // In case you encounter an error that was actually to be expected, // please add it here and document. _ => false, }, }; - if expected { - tracing::error!(target: "network", ?err, "Closing connection to {}", self.peer_info); - } else { - tracing::info!(target: "network", ?err, "Closing connection to {}", self.peer_info); - } - ctx.stop(); + log_assert!(expected, "unexpected closing reason: {err}"); + tracing::info!(target: "network", ?err, "Closing connection to {}", self.peer_info); + self.stop(ctx, ClosingReason::StreamError); } } @@ -980,6 +1022,11 @@ impl actix::Handler for PeerActor { // TODO(#5155) We should change our code to track size of messages received from Peer // as long as it travels to PeerManager, etc. + if self.closing_reason.is_some() { + tracing::warn!(target: "network", "Received message from closing connection {:?}. Ignoring", self.peer_type); + return; + } + self.update_stats_on_receiving_message(msg.len()); let mut peer_msg = match self.parse_message(&msg) { Ok(msg) => msg, @@ -1051,7 +1098,7 @@ impl actix::Handler for PeerActor { match reason { HandshakeFailureReason::GenesisMismatch(genesis) => { warn!(target: "network", "Attempting to connect to a node ({}) with a different genesis block. Our genesis: {:?}, their genesis: {:?}", peer_info, self.network_state.genesis_id, genesis); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); } HandshakeFailureReason::ProtocolVersionMismatch { version, @@ -1063,7 +1110,7 @@ impl actix::Handler for PeerActor { || common_version < PEER_MIN_ALLOWED_PROTOCOL_VERSION { warn!(target: "network", "Unable to connect to a node ({}) due to a network protocol version mismatch. Our version: {:?}, their: {:?}", peer_info, (PROTOCOL_VERSION, PEER_MIN_ALLOWED_PROTOCOL_VERSION), (version, oldest_supported_version)); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); return; } handshake_spec.protocol_version = common_version; @@ -1078,7 +1125,7 @@ impl actix::Handler for PeerActor { self.network_state .peer_manager_addr .do_send(PeerToManagerMsg::UpdatePeerInfo(peer_info)); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); } } } @@ -1088,10 +1135,25 @@ impl actix::Handler for PeerActor { PeerStatus::Connecting(ConnectingStatus::Outbound { handshake_spec, .. }), PeerMessage::LastEdge(edge), ) => { - // Disconnect if neighbor proposed an invalid edge. - if !edge.verify() { + // Check that the edge provided: + let ok = + // - is for the relevant pair of peers + edge.key()==&Edge::make_key(self.my_node_info.id.clone(),handshake_spec.peer_id.clone()) && + // - is not younger than what we proposed originally. This protects us from + // a situation in which the peer presents us with a very outdated edge e, + // and then we sign a new edge with nonce e.nonce+1 which is also outdated. + // It may still happen that an edge with an old nonce gets signed, but only + // if both nodes not know about the newer edge. We don't defend against that. + // Also a malicious peer might send the LastEdge with the edge we just + // signed (pretending that it is old) but we cannot detect that, because the + // signatures are currently deterministic. + edge.nonce() >= handshake_spec.partial_edge_info.nonce && + // - is a correctly signed edge + edge.verify(); + // Disconnect if neighbor sent an invalid edge. + if !ok { info!(target: "network", "{:?}: Peer {:?} sent invalid edge. Disconnect.", self.my_node_id(), self.peer_addr); - ctx.stop(); + self.stop(ctx, ClosingReason::HandshakeFailed); return; } // Recreate the edge with a newer nonce. @@ -1108,7 +1170,7 @@ impl actix::Handler for PeerActor { } (PeerStatus::Ready, PeerMessage::Disconnect) => { debug!(target: "network", "Disconnect signal. Me: {:?} Peer: {:?}", self.my_node_info.id, self.other_peer_id()); - ctx.stop(); + self.stop(ctx, ClosingReason::DisconnectMessage); } (PeerStatus::Ready, PeerMessage::Handshake(_)) => { // Received handshake after already have seen handshake from this peer. @@ -1131,6 +1193,7 @@ impl actix::Handler for PeerActor { self.network_state .peer_manager_addr .do_send(PeerToManagerMsg::PeersResponse(PeersResponse { peers })); + self.network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); } (PeerStatus::Ready, PeerMessage::RequestUpdateNonce(edge_info)) => self .network_state @@ -1146,10 +1209,11 @@ impl actix::Handler for PeerActor { act.send_message_or_log(&PeerMessage::ResponseUpdateNonce(*edge)); } Ok(PeerToManagerMsgResp::BanPeer(reason_for_ban)) => { - act.ban_peer(ctx, reason_for_ban); + act.stop(ctx, ClosingReason::Ban(reason_for_ban)); } _ => {} } + act.network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); actix::fut::ready(()) }) .spawn(ctx), @@ -1161,10 +1225,11 @@ impl actix::Handler for PeerActor { .then(|res, act, ctx| { match res { Ok(PeerToManagerMsgResp::BanPeer(reason_for_ban)) => { - act.ban_peer(ctx, reason_for_ban) + act.stop(ctx, ClosingReason::Ban(reason_for_ban)) } _ => {} } + act.network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); actix::fut::ready(()) }) .spawn(ctx), @@ -1217,7 +1282,7 @@ impl actix::Handler for PeerActor { .into_actor(self) .map(|ban_reason, act, ctx| { if let Some(ban_reason) = ban_reason { - act.ban_peer(ctx, ban_reason); + act.stop(ctx, ClosingReason::Ban(ban_reason)); } act.network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); }) @@ -1232,7 +1297,7 @@ impl actix::Handler for PeerActor { // Receive invalid routed message from peer. if !msg.verify() { - self.ban_peer(ctx, ReasonForBan::InvalidSignature); + self.stop(ctx, ClosingReason::Ban(ReasonForBan::InvalidSignature)); return; } let from = self.other_peer_id().unwrap().clone(); @@ -1318,15 +1383,15 @@ impl Handler for PeerActor { tracing::trace_span!(target: "network", "handle", handler = "PeerManagerRequest") .entered(); span.set_parent(msg.context); - let msg = msg.msg; - let _d = - delay_detector::DelayDetector::new(|| format!("peer manager request {:?}", msg).into()); - match msg { + let _d = delay_detector::DelayDetector::new(|| { + format!("peer manager request {:?}", msg.msg).into() + }); + match msg.msg { PeerManagerRequest::BanPeer(ban_reason) => { - self.ban_peer(ctx, ban_reason); + self.stop(ctx, ClosingReason::Ban(ban_reason)); } PeerManagerRequest::UnregisterPeer => { - ctx.stop(); + self.stop(ctx, ClosingReason::PeerManager); } } } @@ -1342,9 +1407,9 @@ enum ConnectingStatus { /// State machine of the PeerActor. /// The transition graph for inbound connection is: -/// Connecting(Inbound) -> Ready -> Banned +/// Connecting(Inbound) -> Ready /// for outbound connection is: -/// Connecting(Outbound) -> Ready -> Banned +/// Connecting(Outbound) -> Ready /// /// From every state the PeerActor can be immediately shut down. /// In the Connecting state only Handshake-related messages are allowed. @@ -1358,6 +1423,4 @@ enum PeerStatus { Connecting(ConnectingStatus), /// Ready to go. Ready, - /// Banned, should shutdown this peer. - Banned(ReasonForBan), } diff --git a/chain/network/src/peer/stream.rs b/chain/network/src/peer/stream.rs index f1235967a37..e01c5f97896 100644 --- a/chain/network/src/peer/stream.rs +++ b/chain/network/src/peer/stream.rs @@ -1,5 +1,6 @@ use crate::peer_manager::connection; use crate::stats::metrics; +use crate::tcp; use actix::fut::future::wrap_future; use actix::AsyncContext as _; use bytesize::{GIB, MIB}; @@ -21,7 +22,7 @@ type WriteHalf = tokio::io::WriteHalf; #[derive(thiserror::Error, Debug)] pub(crate) enum SendError { - #[error("IO error")] + #[error("IO error: {0}")] IO(#[source] io::Error), #[error("queue is full, got {got_bytes}B, max capacity is {want_max_bytes}")] QueueOverflow { got_bytes: usize, want_max_bytes: usize }, @@ -49,9 +50,9 @@ pub(crate) struct Frame(pub Vec); #[derive(thiserror::Error, Debug, actix::Message)] #[rtype(result = "()")] pub(crate) enum Error { - #[error("send")] + #[error("send: {0}")] Send(#[source] SendError), - #[error("recv")] + #[error("recv: {0}")] Recv(#[source] RecvError), } @@ -70,15 +71,14 @@ where { pub fn spawn( ctx: &mut actix::Context, - peer_addr: SocketAddr, - stream: tokio::net::TcpStream, + stream: tcp::Stream, stats: Arc, ) -> Self { - let (tcp_recv, tcp_send) = tokio::io::split(stream); + let (tcp_recv, tcp_send) = tokio::io::split(stream.stream); let (queue_send, queue_recv) = tokio::sync::mpsc::unbounded_channel(); let send_buf_size_metric = Arc::new(metrics::MetricGuard::new( &*metrics::PEER_DATA_WRITE_BUFFER_SIZE, - vec![peer_addr.to_string()], + vec![stream.peer_addr.to_string()], )); ctx.spawn(wrap_future({ let addr = ctx.address(); @@ -95,7 +95,7 @@ where let stats = stats.clone(); async move { if let Err(err) = - Self::run_recv_loop(peer_addr, tcp_recv, addr.clone(), stats).await + Self::run_recv_loop(stream.peer_addr, tcp_recv, addr.clone(), stats).await { addr.do_send(Error::Recv(err)); } diff --git a/chain/network/src/peer/testonly.rs b/chain/network/src/peer/testonly.rs index 47e5ed69fcc..bff0ec19461 100644 --- a/chain/network/src/peer/testonly.rs +++ b/chain/network/src/peer/testonly.rs @@ -6,13 +6,14 @@ use crate::network_protocol::{ Edge, PartialEdgeInfo, PeerInfo, PeerMessage, RawRoutedMessage, RoutedMessageBody, RoutedMessageV2, RoutingTableUpdate, }; -use crate::peer::peer_actor::{PeerActor, StreamConfig}; +use crate::peer::peer_actor::{ClosingReason, PeerActor}; use crate::peer_manager::network_state::NetworkState; use crate::peer_manager::peer_manager_actor; use crate::private_actix::{PeerRequestResult, RegisterPeerResponse, SendMessage}; use crate::private_actix::{PeerToManagerMsg, PeerToManagerMsgResp}; use crate::routing::routing_table_view::RoutingTableView; use crate::store; +use crate::tcp; use crate::testonly::actix::ActixSystem; use crate::testonly::fake_client; use crate::time; @@ -21,7 +22,6 @@ use actix::{Actor, Context, Handler}; use near_crypto::{InMemorySigner, Signature}; use near_primitives::network::PeerId; use std::sync::Arc; -use tokio::net::{TcpListener, TcpStream}; use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -29,7 +29,6 @@ pub struct PeerConfig { pub chain: Arc, pub network: NetworkConfig, pub peers: Vec, - pub start_handshake_with: Option, pub force_encoding: Option, /// If both start_handshake_with and nonce are set, PeerActor /// will use this nonce in the handshake. @@ -57,11 +56,7 @@ impl PeerConfig { #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum Event { - HandshakeDone(Edge), RoutingTable(RoutingTableUpdate), - RequestUpdateNonce(PartialEdgeInfo), - ResponseUpdateNonce(Edge), - PeersResponse(Vec), Client(fake_client::Event), Network(peer_manager_actor::Event), } @@ -81,22 +76,15 @@ impl Handler for FakePeerManagerActor { let msg_type: &str = (&msg).into(); println!("{}: PeerManager message {}", self.cfg.id(), msg_type); match msg { - PeerToManagerMsg::RegisterPeer(msg) => { - self.event_sink.push(Event::HandshakeDone(msg.connection.edge.clone())); + PeerToManagerMsg::RegisterPeer(..) => { PeerToManagerMsgResp::RegisterPeer(RegisterPeerResponse::Accept) } PeerToManagerMsg::SyncRoutingTable { routing_table_update, .. } => { self.event_sink.push(Event::RoutingTable(routing_table_update)); PeerToManagerMsgResp::Empty } - PeerToManagerMsg::RequestUpdateNonce(_, edge) => { - self.event_sink.push(Event::RequestUpdateNonce(edge)); - PeerToManagerMsgResp::Empty - } - PeerToManagerMsg::ResponseUpdateNonce(edge) => { - self.event_sink.push(Event::ResponseUpdateNonce(edge)); - PeerToManagerMsgResp::Empty - } + PeerToManagerMsg::RequestUpdateNonce(..) => PeerToManagerMsgResp::Empty, + PeerToManagerMsg::ResponseUpdateNonce(..) => PeerToManagerMsgResp::Empty, PeerToManagerMsg::PeersRequest(_) => { // PeerActor would panic if we returned a different response. // This also triggers sending a message to the peer. @@ -104,20 +92,17 @@ impl Handler for FakePeerManagerActor { peers: self.cfg.peers.clone(), }) } - PeerToManagerMsg::PeersResponse(resp) => { - self.event_sink.push(Event::PeersResponse(resp.peers)); - PeerToManagerMsgResp::Empty - } + PeerToManagerMsg::PeersResponse(..) => PeerToManagerMsgResp::Empty, PeerToManagerMsg::Unregister(_) => PeerToManagerMsgResp::Empty, _ => panic!("unsupported message"), } } } -pub struct PeerHandle { - pub(crate) cfg: Arc, +pub(crate) struct PeerHandle { + pub cfg: Arc, actix: ActixSystem, - pub(crate) events: broadcast::Receiver, + pub events: broadcast::Receiver, } impl PeerHandle { @@ -132,18 +117,18 @@ impl PeerHandle { pub async fn complete_handshake(&mut self) -> Edge { self.events .recv_until(|ev| match ev { - Event::HandshakeDone(edge) => Some(edge), - Event::Network(peer_manager_actor::Event::ConnectionClosed(_)) => { - panic!("handshake failed") + Event::Network(peer_manager_actor::Event::HandshakeCompleted(ev)) => Some(ev.edge), + Event::Network(peer_manager_actor::Event::ConnectionClosed(ev)) => { + panic!("handshake failed: {}", ev.reason) } _ => None, }) .await } - pub async fn fail_handshake(&mut self) { + pub async fn fail_handshake(&mut self) -> ClosingReason { self.events .recv_until(|ev| match ev { - Event::Network(peer_manager_actor::Event::ConnectionClosed(_)) => Some(()), + Event::Network(peer_manager_actor::Event::ConnectionClosed(ev)) => Some(ev.reason), // HandshakeDone means that handshake succeeded locally, // but in case this is an inbound connection, it can still // fail on the other side. Therefore we cannot panic on HandshakeDone. @@ -169,7 +154,7 @@ impl PeerHandle { pub async fn start_endpoint( clock: time::Clock, cfg: PeerConfig, - stream: TcpStream, + stream: tcp::Stream, ) -> PeerHandle { let cfg = Arc::new(cfg); let cfg_ = cfg.clone(); @@ -180,7 +165,9 @@ impl PeerHandle { let store = store::Store::from(near_store::db::TestDB::new()); let routing_table_view = RoutingTableView::new(store, cfg.id()); // WARNING: this is a hack to make PeerActor use a specific nonce - if let (Some(nonce), Some(peer_id)) = (&cfg.nonce, &cfg.start_handshake_with) { + if let (Some(nonce), tcp::StreamType::Outbound { peer_id }) = + (&cfg.nonce, &stream.type_) + { routing_table_view.add_local_edges(&[Edge::new( cfg.id(), peer_id.clone(), @@ -200,29 +187,9 @@ impl PeerHandle { routing_table_view, demux::RateLimit { qps: 100., burst: 1 }, )); - PeerActor::spawn( - clock, - stream, - match &cfg.start_handshake_with { - None => StreamConfig::Inbound, - Some(id) => StreamConfig::Outbound { peer_id: id.clone() }, - }, - cfg.force_encoding, - network_state, - ) - .unwrap() + PeerActor::spawn(clock, stream, cfg.force_encoding, network_state).unwrap() }) .await; Self { actix, cfg: cfg_, events: recv } } - - pub async fn start_connection() -> (TcpStream, TcpStream) { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let connect_future = TcpStream::connect(listener.local_addr().unwrap()); - let accept_future = listener.accept(); - let (connect_result, accept_result) = tokio::join!(connect_future, accept_future); - let outbound_stream = connect_result.unwrap(); - let (inbound_stream, _) = accept_result.unwrap(); - (outbound_stream, inbound_stream) - } } diff --git a/chain/network/src/peer/tests/communication.rs b/chain/network/src/peer/tests/communication.rs index fa6ab30d30c..6c4b0be3494 100644 --- a/chain/network/src/peer/tests/communication.rs +++ b/chain/network/src/peer/tests/communication.rs @@ -3,6 +3,7 @@ use crate::network_protocol::Encoding; use crate::network_protocol::{Handshake, HandshakeFailureReason, PeerMessage, RoutedMessageBody}; use crate::peer::testonly::{Event, PeerConfig, PeerHandle}; use crate::peer_manager::peer_manager_actor::Event as PME; +use crate::tcp; use crate::testonly::fake_client::Event as CE; use crate::testonly::make_rng; use crate::testonly::stream::Stream; @@ -29,7 +30,6 @@ async fn test_peer_communication( network: chain.make_config(&mut rng), peers: (0..5).map(|_| data::make_peer_info(&mut rng)).collect(), force_encoding: inbound_encoding, - start_handshake_with: None, nonce: None, }; let outbound_cfg = PeerConfig { @@ -37,11 +37,9 @@ async fn test_peer_communication( network: chain.make_config(&mut rng), peers: (0..5).map(|_| data::make_peer_info(&mut rng)).collect(), force_encoding: outbound_encoding, - start_handshake_with: Some(inbound_cfg.id()), nonce: None, }; - - let (outbound_stream, inbound_stream) = PeerHandle::start_connection().await; + let (outbound_stream, inbound_stream) = tcp::Stream::loopback(inbound_cfg.id()).await; let mut inbound = PeerHandle::start_endpoint(clock.clock(), inbound_cfg, inbound_stream).await; let mut outbound = PeerHandle::start_endpoint(clock.clock(), outbound_cfg, outbound_stream).await; @@ -60,60 +58,64 @@ async fn test_peer_communication( }; let message_processed = |ev| match ev { + Event::Network(PME::MessageProcessed(PeerMessage::SyncAccountsData(_))) => None, Event::Network(PME::MessageProcessed(msg)) => Some(msg), _ => None, }; // RequestUpdateNonce - let want = data::make_partial_edge(&mut rng); - outbound.send(PeerMessage::RequestUpdateNonce(want.clone())).await; - let got = inbound.events.recv_until(filter).await; - assert_eq!(Event::RequestUpdateNonce(want), got); + let mut events = inbound.events.from_now(); + let want = PeerMessage::RequestUpdateNonce(data::make_partial_edge(&mut rng)); + outbound.send(want.clone()).await; + assert_eq!(want, events.recv_until(message_processed).await); // ReponseUpdateNonce + let mut events = inbound.events.from_now(); let a = data::make_signer(&mut rng); let b = data::make_signer(&mut rng); - let want = data::make_edge(&a, &b); - outbound.send(PeerMessage::ResponseUpdateNonce(want.clone())).await; - assert_eq!(Event::ResponseUpdateNonce(want), inbound.events.recv_until(filter).await); + let want = PeerMessage::ResponseUpdateNonce(data::make_edge(&a, &b)); + outbound.send(want.clone()).await; + assert_eq!(want, events.recv_until(message_processed).await); // PeersRequest -> PeersResponse // This test is different from the rest, because we cannot skip sending the response back. - let want = inbound.cfg.peers.clone(); + let mut events = outbound.events.from_now(); + let want = PeerMessage::PeersResponse(inbound.cfg.peers.clone()); outbound.send(PeerMessage::PeersRequest).await; - assert_eq!(Event::PeersResponse(want), outbound.events.recv_until(filter).await); + assert_eq!(want, events.recv_until(message_processed).await); // BlockRequest + let mut events = inbound.events.from_now(); let want = chain.blocks[5].hash().clone(); outbound.send(PeerMessage::BlockRequest(want.clone())).await; - assert_eq!(Event::Client(CE::BlockRequest(want)), inbound.events.recv_until(filter).await); + assert_eq!(Event::Client(CE::BlockRequest(want)), events.recv_until(filter).await); // Block - let want = chain.blocks[5].clone(); - let want = PeerMessage::Block(want); + let mut events = inbound.events.from_now(); + let want = PeerMessage::Block(chain.blocks[5].clone()); outbound.send(want.clone()).await; - assert_eq!(want, inbound.events.recv_until(message_processed).await); + assert_eq!(want, events.recv_until(message_processed).await); // BlockHeadersRequest + let mut events = inbound.events.from_now(); let want: Vec<_> = chain.blocks.iter().map(|b| b.hash().clone()).collect(); outbound.send(PeerMessage::BlockHeadersRequest(want.clone())).await; - assert_eq!( - Event::Client(CE::BlockHeadersRequest(want)), - inbound.events.recv_until(filter).await - ); + assert_eq!(Event::Client(CE::BlockHeadersRequest(want)), events.recv_until(filter).await); // BlockHeaders - let want = chain.get_block_headers(); - let want = PeerMessage::BlockHeaders(want); + let mut events = inbound.events.from_now(); + let want = PeerMessage::BlockHeaders(chain.get_block_headers()); outbound.send(want.clone()).await; - assert_eq!(want, inbound.events.recv_until(message_processed).await); + assert_eq!(want, events.recv_until(message_processed).await); // SyncRoutingTable + let mut events = inbound.events.from_now(); let want = data::make_routing_table(&mut rng); outbound.send(PeerMessage::SyncRoutingTable(want.clone())).await; - assert_eq!(Event::RoutingTable(want), inbound.events.recv().await); + assert_eq!(Event::RoutingTable(want), events.recv().await); // PartialEncodedChunkRequest + let mut events = inbound.events.from_now(); let want = Box::new(outbound.routed_message( RoutedMessageBody::PartialEncodedChunkRequest(PartialEncodedChunkRequestMsg { chunk_hash: chain.blocks[5].chunks()[2].chunk_hash(), @@ -126,9 +128,10 @@ async fn test_peer_communication( )); let want = PeerMessage::Routed(want); outbound.send(want.clone()).await; - assert_eq!(want, inbound.events.recv_until(message_processed).await); + assert_eq!(want, events.recv_until(message_processed).await); // PartialEncodedChunkResponse + let mut events = inbound.events.from_now(); let want_hash = chain.blocks[3].chunks()[0].chunk_hash(); let want_parts = data::make_chunk_parts(chain.chunks[&want_hash].clone()); let want = PeerMessage::Routed(Box::new(outbound.routed_message( @@ -142,35 +145,40 @@ async fn test_peer_communication( None, // TODO(gprusak): this should be clock.now_utc(), once borsh support is dropped. ))); outbound.send(want.clone()).await; - assert_eq!(want, inbound.events.recv_until(message_processed).await); + assert_eq!(want, events.recv_until(message_processed).await); // Transaction + let mut events = inbound.events.from_now(); let want = data::make_signed_transaction(&mut rng); let want = PeerMessage::Transaction(want); outbound.send(want.clone()).await; - assert_eq!(want, inbound.events.recv_until(message_processed).await); + assert_eq!(want, events.recv_until(message_processed).await); // Challenge + let mut events = inbound.events.from_now(); let want = PeerMessage::Challenge(data::make_challenge(&mut rng)); outbound.send(want.clone()).await; - assert_eq!(want, inbound.events.recv_until(message_processed).await); + assert_eq!(want, events.recv_until(message_processed).await); // EpochSyncRequest + let mut events = inbound.events.from_now(); let want = EpochId(chain.blocks[1].hash().clone()); outbound.send(PeerMessage::EpochSyncRequest(want.clone())).await; - assert_eq!(Event::Client(CE::EpochSyncRequest(want)), inbound.events.recv_until(filter).await); + assert_eq!(Event::Client(CE::EpochSyncRequest(want)), events.recv_until(filter).await); // EpochSyncResponse + let mut events = inbound.events.from_now(); let want = PeerMessage::EpochSyncResponse(Box::new(EpochSyncResponse::UpToDate)); outbound.send(want.clone()).await; - assert_eq!(want, inbound.events.recv_until(message_processed).await); + assert_eq!(want, events.recv_until(message_processed).await); // EpochSyncFinalizationRequest + let mut events = inbound.events.from_now(); let want = EpochId(chain.blocks[1].hash().clone()); outbound.send(PeerMessage::EpochSyncFinalizationRequest(want.clone())).await; assert_eq!( Event::Client(CE::EpochSyncFinalizationRequest(want)), - inbound.events.recv_until(filter).await + events.recv_until(filter).await ); // TODO: @@ -210,7 +218,6 @@ async fn test_handshake(outbound_encoding: Option, inbound_encoding: O chain: chain.clone(), peers: (0..5).map(|_| data::make_peer_info(&mut rng)).collect(), force_encoding: inbound_encoding, - start_handshake_with: None, nonce: None, }; let outbound_cfg = PeerConfig { @@ -218,11 +225,11 @@ async fn test_handshake(outbound_encoding: Option, inbound_encoding: O chain: chain.clone(), peers: (0..5).map(|_| data::make_peer_info(&mut rng)).collect(), force_encoding: outbound_encoding, - start_handshake_with: None, nonce: None, }; - let (outbound_stream, inbound_stream) = PeerHandle::start_connection().await; + let (outbound_stream, inbound_stream) = tcp::Stream::loopback(inbound_cfg.id()).await; let inbound = PeerHandle::start_endpoint(clock.clock(), inbound_cfg, inbound_stream).await; + let outbound_port = outbound_stream.local_addr.port(); let mut outbound = Stream::new(outbound_encoding, outbound_stream); // Send too old PROTOCOL_VERSION, expect ProtocolVersionMismatch @@ -231,7 +238,7 @@ async fn test_handshake(outbound_encoding: Option, inbound_encoding: O oldest_supported_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION - 1, sender_peer_id: outbound_cfg.id(), target_peer_id: inbound.cfg.id(), - sender_listen_port: Some(outbound.local_addr.port()), + sender_listen_port: Some(outbound_port), sender_chain_info: outbound_cfg.chain.get_peer_chain_info(), partial_edge_info: outbound_cfg.partial_edge_info(&inbound.cfg.id(), 1), }; diff --git a/chain/network/src/peer/tests/stream.rs b/chain/network/src/peer/tests/stream.rs index 112c0ef26aa..ce9961b7b43 100644 --- a/chain/network/src/peer/tests/stream.rs +++ b/chain/network/src/peer/tests/stream.rs @@ -1,5 +1,7 @@ use crate::actix::ActixSystem; +use crate::network_protocol::testonly as data; use crate::peer::stream; +use crate::tcp; use crate::testonly::make_rng; use actix::Actor as _; use actix::ActorContext as _; @@ -47,14 +49,13 @@ struct Handler { } impl Actor { - async fn spawn(s: tokio::net::TcpStream) -> Handler { + async fn spawn(s: tcp::Stream) -> Handler { let (queue_send, queue_recv) = mpsc::unbounded_channel(); Handler { queue_recv, system: ActixSystem::spawn(|| { Actor::create(|ctx| { - let stream = - stream::FramedStream::spawn(ctx, s.peer_addr().unwrap(), s, Arc::default()); + let stream = stream::FramedStream::spawn(ctx, s, Arc::default()); Self { stream, queue_send } }) }) @@ -65,15 +66,11 @@ impl Actor { #[tokio::test] async fn send_recv() { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let (s1, s2) = tokio::join!( - tokio::net::TcpStream::connect(listener.local_addr().unwrap()), - listener.accept(), - ); - let a1 = Actor::spawn(s1.unwrap()).await; - let mut a2 = Actor::spawn(s2.unwrap().0).await; - let mut rng = make_rng(98324532); + let (s1, s2) = tcp::Stream::loopback(data::make_peer_id(&mut rng)).await; + let a1 = Actor::spawn(s1).await; + let mut a2 = Actor::spawn(s2).await; + for _ in 0..5 { let n = rng.gen_range(1..10); let msgs: Vec<_> = (0..n) diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index d08aac386af..62d7190176e 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -229,12 +229,14 @@ impl Drop for OutboundHandshakePermit { #[derive(Clone)] pub(crate) struct Pool(Arc>); -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Clone, Copy, Debug, PartialEq, Eq)] pub(crate) enum PoolError { #[error("already connected to this peer")] AlreadyConnected, #[error("already started another outbound connection to this peer")] AlreadyStartedConnecting, + #[error("loop connections are not allowed")] + LoopConnection, } impl Pool { @@ -253,6 +255,9 @@ impl Pool { pub fn insert_ready(&self, peer: Arc) -> Result<(), PoolError> { self.0.update(move |pool| { let id = &peer.peer_info.id; + if id == &pool.me { + return Err(PoolError::LoopConnection); + } if pool.ready.contains_key(id) { return Err(PoolError::AlreadyConnected); } @@ -282,6 +287,9 @@ impl Pool { pub fn start_outbound(&self, peer_id: PeerId) -> Result { self.0.update(move |pool| { + if peer_id == pool.me { + return Err(PoolError::LoopConnection); + } if pool.ready.contains_key(&peer_id) { return Err(PoolError::AlreadyConnected); } diff --git a/chain/network/src/peer_manager/connection/tests.rs b/chain/network/src/peer_manager/connection/tests.rs index e3e5a87ff82..0ec6f16e3e8 100644 --- a/chain/network/src/peer_manager/connection/tests.rs +++ b/chain/network/src/peer_manager/connection/tests.rs @@ -1,5 +1,8 @@ use crate::network_protocol::testonly as data; +use crate::peer::peer_actor::ClosingReason; use crate::peer_manager; +use crate::peer_manager::connection; +use crate::private_actix::RegisterPeerError; use crate::testonly::make_rng; use crate::time; use near_o11y::testonly::init_test_logger; @@ -25,25 +28,29 @@ async fn connection_tie_break() { .await; // pm.id is lower - tracing::debug!("PHASE1"); let outbound_conn = pm.start_outbound(chain.clone(), cfgs[2].clone()).await; let inbound_conn = pm.start_inbound(chain.clone(), cfgs[2].clone()).await; // inbound should be rejected, outbound accepted. - inbound_conn.fail_handshake(&clock.clock()).await; + assert_eq!( + ClosingReason::RejectedByPeerManager(RegisterPeerError::PoolError( + connection::PoolError::AlreadyStartedConnecting + )), + inbound_conn.manager_fail_handshake(&clock.clock()).await, + ); outbound_conn.handshake(&clock.clock()).await; // pm.id is higher - tracing::debug!("PHASE2"); let outbound_conn = pm.start_outbound(chain.clone(), cfgs[0].clone()).await; let inbound_conn = pm.start_inbound(chain.clone(), cfgs[0].clone()).await; // inbound should be accepted, outbound rejected by PM. - tracing::debug!("PHASE2b"); let inbound = inbound_conn.handshake(&clock.clock()).await; - tracing::debug!("PHASE2c"); - outbound_conn.fail_handshake(&clock.clock()).await; + assert_eq!( + ClosingReason::RejectedByPeerManager(RegisterPeerError::PoolError( + connection::PoolError::AlreadyConnected + )), + outbound_conn.manager_fail_handshake(&clock.clock()).await, + ); drop(inbound); - - tracing::debug!("PHASE3"); } #[tokio::test] @@ -67,7 +74,10 @@ async fn duplicate_connections() { let conn1 = pm.start_outbound(chain.clone(), cfg.clone()).await; let conn2 = pm.start_outbound(chain.clone(), cfg.clone()).await; // conn2 shouldn't even be started, so it should fail before conn1 completes. - conn2.fail_handshake(&clock.clock()).await; + assert_eq!( + ClosingReason::OutboundNotAllowed(connection::PoolError::AlreadyStartedConnecting), + conn2.manager_fail_handshake(&clock.clock()).await, + ); conn1.handshake(&clock.clock()).await; // Double inbound. @@ -77,7 +87,12 @@ async fn duplicate_connections() { // Second inbound should be rejected. conn1 handshake has to // be completed first though, otherwise we would have a race condition. let conn2 = pm.start_inbound(chain.clone(), cfg.clone()).await; - conn2.fail_handshake(&clock.clock()).await; + assert_eq!( + ClosingReason::RejectedByPeerManager(RegisterPeerError::PoolError( + connection::PoolError::AlreadyConnected + )), + conn2.manager_fail_handshake(&clock.clock()).await, + ); drop(conn1); // Inbound then outbound. @@ -85,7 +100,10 @@ async fn duplicate_connections() { let conn1 = pm.start_inbound(chain.clone(), cfg.clone()).await; let conn1 = conn1.handshake(&clock.clock()).await; let conn2 = pm.start_outbound(chain.clone(), cfg.clone()).await; - conn2.fail_handshake(&clock.clock()).await; + assert_eq!( + ClosingReason::OutboundNotAllowed(connection::PoolError::AlreadyConnected), + conn2.manager_fail_handshake(&clock.clock()).await, + ); drop(conn1); // Outbound then inbound. @@ -93,6 +111,11 @@ async fn duplicate_connections() { let conn1 = pm.start_outbound(chain.clone(), cfg.clone()).await; let conn1 = conn1.handshake(&clock.clock()).await; let conn2 = pm.start_inbound(chain.clone(), cfg.clone()).await; - conn2.fail_handshake(&clock.clock()).await; + assert_eq!( + ClosingReason::RejectedByPeerManager(RegisterPeerError::PoolError( + connection::PoolError::AlreadyConnected + )), + conn2.manager_fail_handshake(&clock.clock()).await, + ); drop(conn1); } diff --git a/chain/network/src/peer_manager/network_state.rs b/chain/network/src/peer_manager/network_state.rs index 04e8775ceeb..f87ca550b69 100644 --- a/chain/network/src/peer_manager/network_state.rs +++ b/chain/network/src/peer_manager/network_state.rs @@ -2,10 +2,9 @@ use crate::accounts_data; use crate::concurrency::demux; use crate::config; use crate::network_protocol::{ - AccountOrPeerIdOrHash, PartialEdgeInfo, PeerIdOrHash, PeerInfo, PeerMessage, Ping, Pong, + AccountOrPeerIdOrHash, PartialEdgeInfo, PeerIdOrHash, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, RoutedMessageV2, }; -use crate::peer::peer_actor::{PeerActor, StreamConfig}; use crate::peer_manager::connection; use crate::private_actix::PeerToManagerMsg; use crate::routing::routing_table_view::RoutingTableView; @@ -19,8 +18,7 @@ use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use tokio::net::TcpStream; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace}; /// How often to request peers from active peers. const REQUEST_PEERS_INTERVAL: time::Duration = time::Duration::milliseconds(60_000); @@ -107,58 +105,6 @@ impl NetworkState { PartialEdgeInfo::new(&self.config.node_id(), peer1, nonce, &self.config.node_key) } - /// Connects peer with given TcpStream. - /// It will fail (and log) if we have too many connections already, - /// or if the peer drops the connection in the meantime. - fn spawn_peer_actor( - self: Arc, - clock: &time::Clock, - stream: TcpStream, - stream_cfg: StreamConfig, - ) { - if let Err(err) = PeerActor::spawn(clock.clone(), stream, stream_cfg, None, self.clone()) { - tracing::info!(target:"network", ?err, "PeerActor::spawn()"); - }; - } - - pub async fn spawn_inbound(self: Arc, clock: &time::Clock, stream: TcpStream) { - self.spawn_peer_actor(clock, stream, StreamConfig::Inbound); - } - - pub async fn spawn_outbound(self: Arc, clock: time::Clock, peer_info: PeerInfo) { - let addr = match peer_info.addr { - Some(addr) => addr, - None => { - warn!(target: "network", ?peer_info, "Trying to connect to peer with no public address"); - return; - } - }; - // The `connect` may take several minutes. This happens when the - // `SYN` packet for establishing a TCP connection gets silently - // dropped, in which case the default TCP timeout is applied. That's - // too long for us, so we shorten it to one second. - // - // Why exactly a second? It was hard-coded in a library we used - // before, so we keep it to preserve behavior. Removing the timeout - // completely was observed to break stuff for real on the testnet. - let stream = - match tokio::time::timeout(std::time::Duration::from_secs(1), TcpStream::connect(addr)) - .await - { - Ok(Ok(it)) => it, - Ok(Err(err)) => { - info!(target: "network", ?addr, ?err, "Error connecting to"); - return; - } - Err(err) => { - info!(target: "network", ?addr, ?err, "Error connecting to"); - return; - } - }; - debug!(target: "network", ?peer_info, "Connecting"); - self.spawn_peer_actor(&clock, stream, StreamConfig::Outbound { peer_id: peer_info.id }); - } - // Determine if the given target is referring to us. pub fn message_for_me(&self, target: &PeerIdOrHash) -> bool { let my_peer_id = self.config.node_id(); diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index ac249d1fbde..8fee6927cfc 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -4,6 +4,7 @@ use crate::network_protocol::{ Ping, Pong, RawRoutedMessage, RoutedMessageBody, RoutingTableUpdate, StateResponseInfo, SyncAccountsData, }; +use crate::peer::peer_actor::PeerActor; use crate::peer_manager::connection; use crate::peer_manager::network_state::NetworkState; use crate::peer_manager::peer_store::PeerStore; @@ -17,13 +18,15 @@ use crate::routing::edge_validator_actor::EdgeValidatorHelper; use crate::routing::routing_table_view::RoutingTableView; use crate::stats::metrics; use crate::store; +use crate::tcp; use crate::time; use crate::types::{ Ban, ConnectedPeerInfo, FullPeerInfo, GetNetworkInfo, KnownPeerStatus, KnownProducer, NetworkClientMessages, NetworkInfo, NetworkRequests, NetworkResponses, - NetworkViewClientMessages, NetworkViewClientResponses, OutboundTcpConnect, - PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType, ReasonForBan, SetChainInfo, + NetworkViewClientMessages, NetworkViewClientResponses, PeerManagerMessageRequest, + PeerManagerMessageResponse, PeerType, ReasonForBan, SetChainInfo, }; +use actix::fut::future::wrap_future; use actix::{ Actor, ActorFutureExt, Addr, Arbiter, AsyncContext, Context, ContextFutureSpawner, Handler, Recipient, Running, WrapFuture, @@ -42,7 +45,6 @@ use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::sync::Arc; -use tokio::net::TcpListener; use tracing::{debug, error, info, trace, warn, Instrument}; /// How much time to wait (in milliseconds) after we send update nonce request before disconnecting. @@ -162,7 +164,6 @@ pub enum Event { ServerStarted, RoutedMessageDropped, RoutingTableUpdate(Arc), - PeerRegistered(PeerInfo), Ping(Ping), Pong(Pong), SetChainInfo, @@ -179,9 +180,11 @@ pub enum Event { // feel free to add support for more. MessageProcessed(PeerMessage), // Reported when a handshake has been started. - PeerActorStarted(SocketAddr), + HandshakeStarted(crate::peer::peer_actor::HandshakeStartedEvent), + // Reported when a handshake has been successfully completed. + HandshakeCompleted(crate::peer::peer_actor::HandshakeCompletedEvent), // Reported when the TCP connection has been closed. - ConnectionClosed(SocketAddr), + ConnectionClosed(crate::peer::peer_actor::ConnectionClosedEvent), } impl Actor for PeerManagerActor { @@ -193,32 +196,30 @@ impl Actor for PeerManagerActor { debug!(target: "network", at = ?server_addr, "starting public server"); let clock = self.clock.clone(); let state = self.state.clone(); - ctx.spawn( - async move { - let listener = match TcpListener::bind(server_addr).await { - Ok(it) => it, - Err(e) => { - panic!( - "failed to start listening on server_addr={:?} e={:?}", - server_addr, e - ); - } - }; - state.config.event_sink.push(Event::ServerStarted); - loop { - if let Ok((conn, client_addr)) = listener.accept().await { - // Always let the new peer to send a handshake message. - // Only then we can decide whether we should accept a connection. - // It is expected to be reasonably cheap: eventually, for TIER2 network - // we would like to exchange set of connected peers even without establishing - // a proper connection. - debug!(target: "network", from = ?client_addr, "got new connection"); - state.clone().spawn_inbound(&clock, conn).await; + ctx.spawn(wrap_future(async move { + let mut listener = match tcp::Listener::bind(server_addr).await { + Ok(it) => it, + Err(e) => { + panic!("failed to start listening on server_addr={server_addr:?} e={e:?}") + } + }; + state.config.event_sink.push(Event::ServerStarted); + loop { + if let Ok(stream) = listener.accept().await { + // Always let the new peer to send a handshake message. + // Only then we can decide whether we should accept a connection. + // It is expected to be reasonably cheap: eventually, for TIER2 network + // we would like to exchange set of connected peers even without establishing + // a proper connection. + debug!(target: "network", from = ?stream.peer_addr, "got new connection"); + if let Err(err) = + PeerActor::spawn(clock.clone(), stream, None, state.clone()) + { + tracing::info!(target:"network", ?err, "PeerActor::spawn()"); } } } - .into_actor(self), - ); + })); } // Periodically push network information to client. @@ -930,9 +931,19 @@ impl PeerManagerActor { interval = default_interval; } - ctx.notify(PeerManagerMessageRequest::OutboundTcpConnect(OutboundTcpConnect( - peer_info, - ))); + ctx.spawn(wrap_future({ + let state = self.state.clone(); + let clock = self.clock.clone(); + async move { + if let Err(err) = async { + let stream = tcp::Stream::connect(&peer_info).await.context("tcp::Stream::connect()")?; + PeerActor::spawn(clock,stream,None,state.clone()).context("PeerActor::spawn()")?; + anyhow::Ok(()) + }.await { + tracing::info!(target:"network", ?err, "failed to connect to {peer_info}"); + } + } + })); } else { self.state.ask_for_more_peers(&self.clock); } @@ -1371,7 +1382,6 @@ impl PeerManagerActor { if let Err(err) = self.register_peer(msg.connection.clone(), ctx) { return RegisterPeerResponse::Reject(RegisterPeerError::PoolError(err)); } - self.config.event_sink.push(Event::PeerRegistered(peer_info.clone())); RegisterPeerResponse::Accept } @@ -1425,10 +1435,13 @@ impl PeerManagerActor { ) } // TEST-ONLY - PeerManagerMessageRequest::OutboundTcpConnect(msg) => { - ctx.spawn( - self.state.clone().spawn_outbound(self.clock.clone(), msg.0).into_actor(self), - ); + PeerManagerMessageRequest::OutboundTcpConnect(stream) => { + let peer_addr = stream.peer_addr; + if let Err(err) = + PeerActor::spawn(self.clock.clone(), stream, None, self.state.clone()) + { + tracing::info!(target:"network", ?err, ?peer_addr, "spawn_outbound()"); + } PeerManagerMessageResponse::OutboundTcpConnect } // TEST-ONLY diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index c23ebd5e4bd..88029349dd8 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -5,13 +5,13 @@ use crate::network_protocol::{ Encoding, PeerAddr, PeerInfo, PeerMessage, SignedAccountData, SyncAccountsData, }; use crate::peer; +use crate::peer::peer_actor::ClosingReason; use crate::peer_manager::peer_manager_actor::Event as PME; +use crate::tcp; use crate::testonly::actix::ActixSystem; use crate::testonly::fake_client; use crate::time; -use crate::types::{ - ChainInfo, GetNetworkInfo, OutboundTcpConnect, PeerManagerMessageRequest, SetChainInfo, -}; +use crate::types::{ChainInfo, GetNetworkInfo, PeerManagerMessageRequest, SetChainInfo}; use crate::PeerManagerActor; use near_primitives::network::PeerId; use near_primitives::types::{AccountId, EpochId}; @@ -56,13 +56,13 @@ impl From<&Arc> for NormalAccountData { pub(crate) struct RawConnection { events: broadcast::Receiver, - stream: tokio::net::TcpStream, + stream: tcp::Stream, cfg: peer::testonly::PeerConfig, } impl RawConnection { pub async fn handshake(mut self, clock: &time::Clock) -> peer::testonly::PeerHandle { - let node_id = self.cfg.network.node_id(); + let stream_id = self.stream.id(); let mut peer = peer::testonly::PeerHandle::start_endpoint(clock.clone(), self.cfg, self.stream).await; @@ -72,17 +72,37 @@ impl RawConnection { // Wait for the peer manager to complete the handshake. self.events .recv_until(|ev| match ev { - Event::PeerManager(PME::PeerRegistered(info)) if node_id == info.id => Some(()), + Event::PeerManager(PME::HandshakeCompleted(ev)) if ev.stream_id == stream_id => { + Some(()) + } + Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { + panic!("handshake aborted: {}", ev.reason) + } _ => None, }) .await; peer } - pub async fn fail_handshake(self, clock: &time::Clock) { - let mut peer = + // Try to perform a handshake. PeerManager is expected to reject the handshake. + pub async fn manager_fail_handshake(mut self, clock: &time::Clock) -> ClosingReason { + let stream_id = self.stream.id(); + let peer = peer::testonly::PeerHandle::start_endpoint(clock.clone(), self.cfg, self.stream).await; - peer.fail_handshake().await; + let reason = self + .events + .recv_until(|ev| match ev { + Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { + Some(ev.reason) + } + Event::PeerManager(PME::HandshakeCompleted(ev)) if ev.stream_id == stream_id => { + panic!("PeerManager accepted the handshake") + } + _ => None, + }) + .await; + drop(peer); + reason } } @@ -96,17 +116,18 @@ impl ActorHandler { } pub async fn connect_to(&self, peer_info: &PeerInfo) { + let stream = tcp::Stream::connect(peer_info).await.unwrap(); let mut events = self.events.from_now(); - self.actix - .addr - .send(PeerManagerMessageRequest::OutboundTcpConnect(OutboundTcpConnect( - peer_info.clone(), - ))) - .await - .unwrap(); + let stream_id = stream.id(); + self.actix.addr.do_send(PeerManagerMessageRequest::OutboundTcpConnect(stream)); events .recv_until(|ev| match &ev { - Event::PeerManager(PME::PeerRegistered(info)) if peer_info == info => Some(()), + Event::PeerManager(PME::HandshakeCompleted(ev)) if ev.stream_id == stream_id => { + Some(()) + } + Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { + panic!("PeerManager accepted the handshake") + } _ => None, }) .await; @@ -121,18 +142,17 @@ impl ActorHandler { // 1. reserve a TCP port // 2. snapshot event stream // 3. establish connection. - let socket = tokio::net::TcpSocket::new_v4().unwrap(); - socket.bind("127.0.0.1:0".parse().unwrap()).unwrap(); - let local_addr = socket.local_addr().unwrap(); + let socket = tcp::Socket::bind_v4(); let events = self.events.from_now(); + let stream = socket.connect(&self.peer_info()).await; + let stream_id = stream.id(); let conn = RawConnection { events, - stream: socket.connect(self.cfg.node_addr.unwrap()).await.unwrap(), + stream, cfg: peer::testonly::PeerConfig { network: network_cfg, chain, peers: vec![], - start_handshake_with: Some(PeerId::new(self.cfg.node_key.public_key())), force_encoding: Some(Encoding::Proto), nonce: None, }, @@ -142,16 +162,15 @@ impl ActorHandler { conn.events .clone() .recv_until(|ev| match ev { - Event::PeerManager(PME::PeerActorStarted(addr)) - | Event::PeerManager(PME::ConnectionClosed(addr)) - if addr == local_addr => - { + Event::PeerManager(PME::HandshakeStarted(ev)) if ev.stream_id == stream_id => { + Some(()) + } + Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { Some(()) } _ => None, }) .await; - tracing::debug!("PHASE handshake started"); conn } @@ -160,39 +179,30 @@ impl ActorHandler { chain: Arc, network_cfg: config::NetworkConfig, ) -> RawConnection { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let (outbound_stream, inbound_stream) = tcp::Stream::loopback(network_cfg.node_id()).await; + let stream_id = outbound_stream.id(); let events = self.events.from_now(); - let peer_info = PeerInfo { - id: network_cfg.node_id(), - addr: Some(listener.local_addr().unwrap()), - account_id: None, - }; - self.actix.addr.do_send(PeerManagerMessageRequest::OutboundTcpConnect(OutboundTcpConnect( - peer_info.clone(), - ))); - let (stream, _) = listener.accept().await.unwrap(); + self.actix.addr.do_send(PeerManagerMessageRequest::OutboundTcpConnect(outbound_stream)); let conn = RawConnection { events, - stream, + stream: inbound_stream, cfg: peer::testonly::PeerConfig { network: network_cfg, chain, peers: vec![], - start_handshake_with: None, force_encoding: Some(Encoding::Proto), nonce: None, }, }; // Wait until the handshake started or connection is closed. // The Handshake is not performed yet. - let local_addr = listener.local_addr().unwrap(); conn.events .clone() .recv_until(|ev| match ev { - Event::PeerManager(PME::PeerActorStarted(addr)) - | Event::PeerManager(PME::ConnectionClosed(addr)) - if addr == local_addr => - { + Event::PeerManager(PME::HandshakeStarted(ev)) if ev.stream_id == stream_id => { + Some(()) + } + Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { Some(()) } _ => None, diff --git a/chain/network/src/peer_manager/tests.rs b/chain/network/src/peer_manager/tests.rs index 9ab94534ef8..6fef744da35 100644 --- a/chain/network/src/peer_manager/tests.rs +++ b/chain/network/src/peer_manager/tests.rs @@ -1,25 +1,29 @@ use crate::concurrency::demux; use crate::config; use crate::network_protocol::testonly as data; -use crate::network_protocol::{Encoding, PeerAddr, SyncAccountsData}; +use crate::network_protocol::{Encoding, Handshake, PartialEdgeInfo, PeerAddr, SyncAccountsData}; use crate::network_protocol::{Ping, RoutedMessageBody, EDGE_MIN_TIMESTAMP_NONCE}; use crate::peer; +use crate::peer::peer_actor::ClosingReason; use crate::peer_manager; +use crate::peer_manager::connection; use crate::peer_manager::network_state::LIMIT_PENDING_PEERS; use crate::peer_manager::peer_manager_actor::Event as PME; use crate::peer_manager::testonly::{Event, NormalAccountData}; +use crate::private_actix::RegisterPeerError; +use crate::tcp; +use crate::testonly::stream::Stream; use crate::testonly::{assert_is_superset, make_rng, AsSet as _}; use crate::time; use crate::types::{PeerMessage, RoutingTableUpdate}; use itertools::Itertools; use near_o11y::testonly::init_test_logger; -use near_primitives::network::PeerId; +use near_primitives::version::PROTOCOL_VERSION; use pretty_assertions::assert_eq; use rand::seq::SliceRandom as _; use rand::Rng as _; use std::collections::HashSet; use std::sync::Arc; -use tokio::net::TcpStream; // After the initial exchange, all subsequent SyncRoutingTable messages are // expected to contain only the diff of the known data. @@ -41,11 +45,10 @@ async fn repeated_data_in_sync_routing_table() { network: chain.make_config(rng), chain, peers: vec![], - start_handshake_with: Some(PeerId::new(pm.cfg.node_key.public_key())), force_encoding: Some(Encoding::Proto), nonce: None, }; - let stream = TcpStream::connect(pm.cfg.node_addr.unwrap()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); let mut peer = peer::testonly::PeerHandle::start_endpoint(clock.clock(), cfg, stream).await; let edge = peer.complete_handshake().await; @@ -124,11 +127,10 @@ async fn no_edge_broadcast_after_restart() { network: chain.make_config(rng), chain: chain.clone(), peers: vec![], - start_handshake_with: Some(PeerId::new(pm.cfg.node_key.public_key())), force_encoding: Some(Encoding::Proto), nonce: None, }; - let stream = TcpStream::connect(pm.cfg.node_addr.unwrap()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); let mut peer = peer::testonly::PeerHandle::start_endpoint(clock.clock(), cfg, stream).await; let edge = peer.complete_handshake().await; @@ -217,12 +219,11 @@ async fn test_nonces() { network: chain.make_config(rng), chain: chain.clone(), peers: vec![], - start_handshake_with: Some(PeerId::new(pm.cfg.node_key.public_key())), force_encoding: Some(Encoding::Proto), // Connect with nonce equal to unix timestamp nonce: test.0, }; - let stream = TcpStream::connect(pm.cfg.node_addr.unwrap()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); let mut peer = peer::testonly::PeerHandle::start_endpoint(clock.clock(), cfg, stream).await; if test.1 { peer.complete_handshake().await; @@ -251,11 +252,10 @@ async fn ttl() { network: chain.make_config(rng), chain, peers: vec![], - start_handshake_with: Some(PeerId::new(pm.cfg.node_key.public_key())), force_encoding: Some(Encoding::Proto), nonce: None, }; - let stream = TcpStream::connect(pm.cfg.node_addr.unwrap()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); let mut peer = peer::testonly::PeerHandle::start_endpoint(clock.clock(), cfg, stream).await; peer.complete_handshake().await; // await for peer manager to compute the routing table. @@ -576,13 +576,80 @@ async fn connection_spam_security_test() { } // Try to establish additional connections. Should fail. for _ in 0..10 { - pm.start_inbound(chain.clone(), chain.make_config(rng)) - .await - .fail_handshake(&clock.clock()) - .await; + let conn = pm.start_inbound(chain.clone(), chain.make_config(rng)).await; + assert_eq!( + ClosingReason::TooManyInbound, + conn.manager_fail_handshake(&clock.clock()).await + ); } // Terminate the pending connections. Should succeed. for c in conns { c.handshake(&clock.clock()).await; } } + +#[tokio::test] +async fn loop_connection() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + let pm = peer_manager::testonly::start( + clock.clock(), + near_store::db::TestDB::new(), + chain.make_config(rng), + chain.clone(), + ) + .await; + let mut cfg = chain.make_config(rng); + cfg.node_key = pm.cfg.node_key.clone(); + + // Starting an outbound loop connection should be stopped without sending the handshake. + let conn = pm.start_outbound(chain.clone(), cfg).await; + assert_eq!( + ClosingReason::OutboundNotAllowed(connection::PoolError::LoopConnection), + conn.manager_fail_handshake(&clock.clock()).await + ); + + // An inbound connection pretending to be a loop should be rejected. + let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); + let stream_id = stream.id(); + let port = stream.local_addr.port(); + let mut events = pm.events.from_now(); + let mut stream = Stream::new(Some(Encoding::Proto), stream); + stream + .write(&PeerMessage::Handshake(Handshake { + protocol_version: PROTOCOL_VERSION, + oldest_supported_version: PROTOCOL_VERSION, + sender_peer_id: pm.cfg.node_id(), + target_peer_id: pm.cfg.node_id(), + sender_listen_port: Some(port), + sender_chain_info: chain.get_peer_chain_info(), + partial_edge_info: PartialEdgeInfo::new( + &pm.cfg.node_id(), + &pm.cfg.node_id(), + 1, + &pm.cfg.node_key, + ), + })) + .await; + let reason = events + .recv_until(|ev| match ev { + Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { + Some(ev.reason) + } + Event::PeerManager(PME::HandshakeCompleted(ev)) if ev.stream_id == stream_id => { + panic!("PeerManager accepted the handshake") + } + _ => None, + }) + .await; + assert_eq!( + ClosingReason::RejectedByPeerManager(RegisterPeerError::PoolError( + connection::PoolError::LoopConnection + )), + reason + ); +} diff --git a/chain/network/src/private_actix.rs b/chain/network/src/private_actix.rs index 92410afa07f..3b631b525b6 100644 --- a/chain/network/src/private_actix.rs +++ b/chain/network/src/private_actix.rs @@ -63,7 +63,7 @@ pub(crate) struct RegisterPeer { pub connection: Arc, } -#[derive(Debug)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum RegisterPeerError { Blacklisted, Banned, diff --git a/chain/network/src/stats/metrics.rs b/chain/network/src/stats/metrics.rs index fdfe43331ad..b9251d0d147 100644 --- a/chain/network/src/stats/metrics.rs +++ b/chain/network/src/stats/metrics.rs @@ -320,13 +320,6 @@ pub(crate) static PEER_REACHABLE: Lazy = Lazy::new(|| { ) .unwrap() }); -pub static RECEIVED_INFO_ABOUT_ITSELF: Lazy = Lazy::new(|| { - try_create_int_counter( - "received_info_about_itself", - "Number of times a peer tried to connect to itself", - ) - .unwrap() -}); static DROPPED_MESSAGE_COUNT: Lazy = Lazy::new(|| { try_create_int_counter_vec( "near_dropped_message_by_type_and_reason_count", diff --git a/chain/network/src/tcp.rs b/chain/network/src/tcp.rs new file mode 100644 index 00000000000..125fe74a5b5 --- /dev/null +++ b/chain/network/src/tcp.rs @@ -0,0 +1,119 @@ +use crate::network_protocol::PeerInfo; +use anyhow::{anyhow, Context as _}; +use near_primitives::network::PeerId; + +#[derive(Clone, Debug)] +pub(crate) enum StreamType { + Inbound, + Outbound { peer_id: PeerId }, +} + +#[derive(Debug)] +pub struct Stream { + pub(crate) stream: tokio::net::TcpStream, + pub(crate) type_: StreamType, + /// cached stream.local_addr() + pub(crate) local_addr: std::net::SocketAddr, + /// cached peer_addr.local_addr() + pub(crate) peer_addr: std::net::SocketAddr, +} + +/// TEST-ONLY. Used to identify events relevant to a specific TCP connection in unit tests. +/// Every outbound TCP connection has a unique TCP port (while inbound TCP connections +/// have the same port as the TCP listen socket). +/// We are assuming here that the unit test is executed on a single machine on the loopback +/// network interface, so that both inbound and outbound IP is always 127.0.0.1. +/// To create a reliable StreamId for a distributed, we would have to transmit it over the connection itself, +/// which is doable, but not yet needed in our testing framework. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct StreamId { + inbound: std::net::SocketAddr, + outbound: std::net::SocketAddr, +} + +#[cfg(test)] +pub(crate) struct Socket(tokio::net::TcpSocket); + +#[cfg(test)] +impl Socket { + pub fn bind_v4() -> Self { + let socket = tokio::net::TcpSocket::new_v4().unwrap(); + socket.bind("127.0.0.1:0".parse().unwrap()).unwrap(); + Self(socket) + } + + pub async fn connect(self, peer_info: &PeerInfo) -> Stream { + // TODO(gprusak): this could replace Stream::connect, + // however this means that we will have to replicate everything + // that tokio::net::TcpStream sets on the socket. + // As long as Socket::connect is test-only we may ignore that. + let stream = self.0.connect(peer_info.addr.unwrap()).await.unwrap(); + Stream::new(stream, StreamType::Outbound { peer_id: peer_info.id.clone() }).unwrap() + } +} + +impl Stream { + fn new(stream: tokio::net::TcpStream, type_: StreamType) -> std::io::Result { + Ok(Self { peer_addr: stream.peer_addr()?, local_addr: stream.local_addr()?, stream, type_ }) + } + + pub async fn connect(peer_info: &PeerInfo) -> anyhow::Result { + let addr = + peer_info.addr.ok_or(anyhow!("Trying to connect to peer with no public address"))?; + // The `connect` may take several minutes. This happens when the + // `SYN` packet for establishing a TCP connection gets silently + // dropped, in which case the default TCP timeout is applied. That's + // too long for us, so we shorten it to one second. + // + // Why exactly a second? It was hard-coded in a library we used + // before, so we keep it to preserve behavior. Removing the timeout + // completely was observed to break stuff for real on the testnet. + let stream = tokio::time::timeout( + std::time::Duration::from_secs(1), + tokio::net::TcpStream::connect(addr), + ) + .await? + .context("TcpStream::connect()")?; + Ok(Stream::new(stream, StreamType::Outbound { peer_id: peer_info.id.clone() })?) + } + + /// Establishes a loopback TCP connection to localhost with random ports. + /// Returns a pair of streams: (outbound,inbound). + #[cfg(test)] + pub async fn loopback(peer_id: PeerId) -> (Stream, Stream) { + let localhost = std::net::SocketAddr::new(std::net::Ipv4Addr::LOCALHOST.into(), 0); + let mut listener = Listener::bind(localhost).await.unwrap(); + let peer_info = PeerInfo { + id: peer_id, + addr: Some(listener.0.local_addr().unwrap()), + account_id: None, + }; + let (outbound, inbound) = tokio::join!(Stream::connect(&peer_info), listener.accept(),); + (outbound.unwrap(), inbound.unwrap()) + } + + // TEST-ONLY used in reporting test events. + pub(crate) fn id(&self) -> StreamId { + match self.type_ { + StreamType::Inbound => StreamId { inbound: self.local_addr, outbound: self.peer_addr }, + StreamType::Outbound { .. } => { + StreamId { inbound: self.peer_addr, outbound: self.local_addr } + } + } + } +} + +pub(crate) struct Listener(tokio::net::TcpListener); + +impl Listener { + // TODO(gprusak): this shouldn't be async. It is only + // because TcpListener accepts anything that asynchronously resolves to SocketAddr. + pub async fn bind(addr: std::net::SocketAddr) -> std::io::Result { + Ok(Self(tokio::net::TcpListener::bind(addr).await?)) + } + + pub async fn accept(&mut self) -> std::io::Result { + let (stream, _) = self.0.accept().await?; + Stream::new(stream, StreamType::Inbound) + } +} diff --git a/chain/network/src/testonly/stream.rs b/chain/network/src/testonly/stream.rs index 6107fa919e1..d1329cb1266 100644 --- a/chain/network/src/testonly/stream.rs +++ b/chain/network/src/testonly/stream.rs @@ -1,35 +1,20 @@ -/// Stream wraps TcpStream, allowing for sending & receiving PeerMessages. -/// Currently just used in tests, but eventually will replace actix-driven communication. +//! Stream wrapper, which allows for custom interactions with the network protocol. +//! We might want to turn it into a fuzz testing framework for the network protocol. use bytes::BytesMut; -use tokio::io; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net; -use tokio::sync::Mutex as AsyncMutex; use crate::network_protocol::{Encoding, PeerMessage}; +use crate::tcp; pub struct Stream { - pub local_addr: std::net::SocketAddr, - pub peer_addr: std::net::SocketAddr, + stream: tcp::Stream, force_encoding: Option, protocol_buffers_supported: bool, - reader: AsyncMutex>>, - writer: AsyncMutex>>, } impl Stream { - pub fn new(force_encoding: Option, stream: net::TcpStream) -> Self { - let local_addr = stream.local_addr().unwrap(); - let peer_addr = stream.peer_addr().unwrap(); - let (reader, writer) = io::split(stream); - Self { - local_addr, - peer_addr, - force_encoding, - protocol_buffers_supported: false, - reader: AsyncMutex::new(io::BufReader::new(reader)), - writer: AsyncMutex::new(io::BufWriter::new(writer)), - } + pub fn new(force_encoding: Option, stream: tcp::Stream) -> Self { + Self { stream, force_encoding, protocol_buffers_supported: false } } fn encoding(&self) -> Option { @@ -43,12 +28,11 @@ impl Stream { } pub async fn read(&mut self) -> PeerMessage { - let mut reader = self.reader.lock().await; 'read: loop { - let n = reader.read_u32_le().await.unwrap() as usize; + let n = self.stream.stream.read_u32_le().await.unwrap() as usize; let mut buf = BytesMut::new(); buf.resize(n, 0); - reader.read_exact(&mut buf[..]).await.unwrap(); + self.stream.stream.read_exact(&mut buf[..]).await.unwrap(); for enc in [Encoding::Proto, Encoding::Borsh] { if let Ok(msg) = PeerMessage::deserialize(enc, &buf[..]) { // If deserialize() succeeded but we expected different encoding, ignore the @@ -67,7 +51,7 @@ impl Stream { } } - pub async fn write(&self, msg: &PeerMessage) { + pub async fn write(&mut self, msg: &PeerMessage) { if let Some(enc) = self.encoding() { self.write_encoded(&msg.serialize(enc)).await; } else { @@ -76,10 +60,9 @@ impl Stream { } } - async fn write_encoded(&self, msg: &[u8]) { - let mut writer = self.writer.lock().await; - writer.write_u32_le(msg.len() as u32).await.unwrap(); - writer.write_all(msg).await.unwrap(); - writer.flush().await.unwrap(); + async fn write_encoded(&mut self, msg: &[u8]) { + self.stream.stream.write_u32_le(msg.len() as u32).await.unwrap(); + self.stream.stream.write_all(msg).await.unwrap(); + self.stream.stream.flush().await.unwrap(); } } diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 0430a62bb88..07543008a90 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -113,16 +113,6 @@ impl KnownPeerState { } } -/// Actor message that holds the TCP stream from an inbound TCP connection -#[derive(actix::Message, Debug)] -#[rtype(result = "()")] -pub struct InboundTcpConnect(pub tokio::net::TcpStream); - -/// Actor message to request the creation of an outbound TCP connection to a peer. -#[derive(actix::Message, Clone, Debug)] -#[rtype(result = "()")] -pub struct OutboundTcpConnect(pub PeerInfo); - impl KnownPeerStatus { pub fn is_banned(&self) -> bool { matches!(self, KnownPeerStatus::Banned(_, _)) @@ -176,7 +166,7 @@ pub enum PeerManagerMessageRequest { /// Request PeerManager to connect to the given peer. /// Used in tests and internally by PeerManager. /// TODO: replace it with AsyncContext::spawn/run_later for internal use. - OutboundTcpConnect(OutboundTcpConnect), + OutboundTcpConnect(crate::tcp::Stream), /// TEST-ONLY SetAdvOptions(crate::test_utils::SetAdvOptions), /// The following types of requests are used to trigger actions in the Peer Manager for testing. @@ -227,8 +217,8 @@ impl PeerManagerMessageRequest { #[derive(actix::MessageResponse, Debug)] pub enum PeerManagerMessageResponse { NetworkResponses(NetworkResponses), - OutboundTcpConnect, /// TEST-ONLY + OutboundTcpConnect, SetAdvOptions, FetchRoutingTable(RoutingTableInfo), PingTo, @@ -638,8 +628,6 @@ mod tests { assert_size!(RawRoutedMessage); assert_size!(RoutedMessage); assert_size!(KnownPeerState); - assert_size!(InboundTcpConnect); - assert_size!(OutboundTcpConnect); assert_size!(Ban); assert_size!(StateResponseInfoV1); assert_size!(PartialEncodedChunkRequestMsg); diff --git a/integration-tests/src/tests/network/peer_handshake.rs b/integration-tests/src/tests/network/peer_handshake.rs index 404bcaf1290..c96ace8da5b 100644 --- a/integration-tests/src/tests/network/peer_handshake.rs +++ b/integration-tests/src/tests/network/peer_handshake.rs @@ -212,9 +212,5 @@ fn check_connection_with_new_identity() -> anyhow::Result<()> { runner.push(Action::Wait(time::Duration::milliseconds(2000))); - // Check the no node tried to connect to itself in this process. - #[cfg(feature = "test_features")] - runner.push_action(wait_for(|| near_network::RECEIVED_INFO_ABOUT_ITSELF.get() == 0)); - start_test(runner) } diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index eb6b22930e9..85d881a1a0b 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -10,14 +10,15 @@ use near_network::actix::ActixSystem; use near_network::blacklist; use near_network::broadcast; use near_network::config; +use near_network::tcp; use near_network::test_utils::{ expected_routing_tables, open_port, peer_id_from_seed, BanPeerSignal, GetInfo, }; use near_network::time; use near_network::types::NetworkRecipient; use near_network::types::{ - OutboundTcpConnect, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, - Ping as NetPing, Pong as NetPong, ROUTED_MESSAGE_TTL, + PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, Ping as NetPing, + Pong as NetPong, ROUTED_MESSAGE_TTL, }; use near_network::{Event, PeerManagerActor}; use near_o11y::testonly::init_test_logger; @@ -262,13 +263,14 @@ impl StateMachine { debug!(target: "network", num_prev_actions, action = ?action_clone, "runner.rs: Action"); let pm = info.get_node(from)?.actix.addr.clone(); let peer_info = info.runner.test_config[to].peer_info(); - let peer_id = peer_info.id.clone(); - pm.send(PeerManagerMessageRequest::OutboundTcpConnect( - OutboundTcpConnect(peer_info), - )).await?; + match tcp::Stream::connect(&peer_info).await { + Ok(stream) => { pm.send(PeerManagerMessageRequest::OutboundTcpConnect(stream)).await?; }, + Err(err) => tracing::debug!("tcp::Stream::connect({peer_info}): {err}"), + } if !force { return Ok(ControlFlow::Break(())) } + let peer_id = peer_info.id.clone(); let res = pm.send(GetInfo{}).await?; for peer in &res.connected_peers { if peer.full_peer_info.peer_info.id==peer_id {