From 7ea9809da6cc0a547bf79e68a3ac0454f53a9bda Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 15 Jan 2022 21:27:24 +0100 Subject: [PATCH 01/22] protocols/: Implement Direct Connection Upgrade through Relay (DCUtR) Enables two peers to coordinate a hole punch (direct connection upgrade) via a relayed connection. See https://github.com/libp2p/specs/blob/master/relay/DCUtR.md for specification. --- Cargo.toml | 3 + misc/metrics/Cargo.toml | 2 + misc/metrics/src/dcutr.rs | 89 +++++ misc/metrics/src/lib.rs | 6 + protocols/dcutr/Cargo.toml | 40 +++ protocols/dcutr/build.rs | 23 ++ protocols/dcutr/examples/client.rs | 275 ++++++++++++++++ protocols/dcutr/src/behaviour.rs | 400 +++++++++++++++++++++++ protocols/dcutr/src/handler.rs | 81 +++++ protocols/dcutr/src/handler/direct.rs | 114 +++++++ protocols/dcutr/src/handler/relayed.rs | 380 +++++++++++++++++++++ protocols/dcutr/src/lib.rs | 30 ++ protocols/dcutr/src/message.proto | 19 ++ protocols/dcutr/src/protocol.rs | 26 ++ protocols/dcutr/src/protocol/inbound.rs | 151 +++++++++ protocols/dcutr/src/protocol/outbound.rs | 159 +++++++++ protocols/dcutr/tests/lib.rs | 285 ++++++++++++++++ src/lib.rs | 4 + 18 files changed, 2087 insertions(+) create mode 100644 misc/metrics/src/dcutr.rs create mode 100644 protocols/dcutr/Cargo.toml create mode 100644 protocols/dcutr/build.rs create mode 100644 protocols/dcutr/examples/client.rs create mode 100644 protocols/dcutr/src/behaviour.rs create mode 100644 protocols/dcutr/src/handler.rs create mode 100644 protocols/dcutr/src/handler/direct.rs create mode 100644 protocols/dcutr/src/handler/relayed.rs create mode 100644 protocols/dcutr/src/lib.rs create mode 100644 protocols/dcutr/src/message.proto create mode 100644 protocols/dcutr/src/protocol.rs create mode 100644 protocols/dcutr/src/protocol/inbound.rs create mode 100644 protocols/dcutr/src/protocol/outbound.rs create mode 100644 protocols/dcutr/tests/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 4914493b724..68a3e0deba6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ default = [ "yamux", ] autonat = ["libp2p-autonat"] +dcutr = ["libp2p-dcutr", "libp2p-metrics/dcutr"] deflate = ["libp2p-deflate"] dns-async-std = ["libp2p-dns", "libp2p-dns/async-std"] dns-tokio = ["libp2p-dns", "libp2p-dns/tokio"] @@ -78,6 +79,7 @@ lazy_static = "1.2" libp2p-autonat = { version = "0.20.0", path = "protocols/autonat", optional = true } libp2p-core = { version = "0.31.0", path = "core", default-features = false } +libp2p-dcutr = { version = "0.1.0", path = "protocols/dcutr", optional = true } libp2p-floodsub = { version = "0.33.0", path = "protocols/floodsub", optional = true } libp2p-gossipsub = { version = "0.35.0", path = "./protocols/gossipsub", optional = true } libp2p-identify = { version = "0.33.0", path = "protocols/identify", optional = true } @@ -124,6 +126,7 @@ members = [ "misc/peer-id-generator", "muxers/mplex", "muxers/yamux", + "protocols/dcutr", "protocols/autonat", "protocols/floodsub", "protocols/gossipsub", diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 8ed8f2b59dc..1d2f65fbbdc 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -16,8 +16,10 @@ identify = ["libp2p-identify"] kad = ["libp2p-kad"] ping = ["libp2p-ping"] relay = ["libp2p-relay"] +dcutr = ["libp2p-dcutr"] [dependencies] +libp2p-dcutr = { version = "0.1.0", path = "../../protocols/dcutr", optional = true } libp2p-core = { version = "0.31.0", path = "../../core", default-features = false } libp2p-gossipsub = { version = "0.35.0", path = "../../protocols/gossipsub", optional = true } libp2p-identify = { version = "0.33.0", path = "../../protocols/identify", optional = true } diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs new file mode 100644 index 00000000000..1632a68996d --- /dev/null +++ b/misc/metrics/src/dcutr.rs @@ -0,0 +1,89 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use open_metrics_client::encoding::text::Encode; +use open_metrics_client::metrics::counter::Counter; +use open_metrics_client::metrics::family::Family; +use open_metrics_client::registry::Registry; + +pub struct Metrics { + events: Family, +} + +impl Metrics { + pub fn new(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("relay"); + + let events = Family::default(); + sub_registry.register( + "events", + "Events emitted by the relay NetworkBehaviour", + Box::new(events.clone()), + ); + + Self { events } + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Encode)] +struct EventLabels { + event: EventType, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Encode)] +enum EventType { + InitiateDirectConnectionUpgrade, + RemoteInitiatedDirectConnectionUpgrade, + DirectConnectionUpgradeSucceeded, + DirectConnectionUpgradeFailed, +} + +impl From<&libp2p_dcutr::behaviour::Event> for EventType { + fn from(event: &libp2p_dcutr::behaviour::Event) -> Self { + match event { + libp2p_dcutr::behaviour::Event::InitiateDirectConnectionUpgrade { + remote_peer_id: _, + local_relayed_addr: _, + } => EventType::InitiateDirectConnectionUpgrade, + libp2p_dcutr::behaviour::Event::RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id: _, + remote_relayed_addr: _, + } => EventType::RemoteInitiatedDirectConnectionUpgrade, + libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeSucceeded { remote_peer_id: _ } => { + EventType::DirectConnectionUpgradeSucceeded + } + libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeFailed { + remote_peer_id: _, + error: _, + } => EventType::DirectConnectionUpgradeFailed, + } + } +} + +impl super::Recorder for super::Metrics { + fn record(&self, event: &libp2p_dcutr::behaviour::Event) { + self.dcutr + .events + .get_or_create(&EventLabels { + event: event.into(), + }) + .inc(); + } +} diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 0a507d7ae62..7d6e7bc7f92 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -25,6 +25,8 @@ //! //! See `examples` directory for more. +#[cfg(feature = "dcutr")] +mod dcutr; #[cfg(feature = "gossipsub")] mod gossipsub; #[cfg(feature = "identify")] @@ -41,6 +43,8 @@ use open_metrics_client::registry::Registry; /// Set of Swarm and protocol metrics derived from emitted events. pub struct Metrics { + #[cfg(feature = "dcutr")] + dcutr: dcutr::Metrics, #[cfg(feature = "gossipsub")] gossipsub: gossipsub::Metrics, #[cfg(feature = "identify")] @@ -66,6 +70,8 @@ impl Metrics { pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("libp2p"); Self { + #[cfg(feature = "dcutr")] + dcutr: dcutr::Metrics::new(sub_registry), #[cfg(feature = "gossipsub")] gossipsub: gossipsub::Metrics::new(sub_registry), #[cfg(feature = "identify")] diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml new file mode 100644 index 00000000000..8bc45ad1f1d --- /dev/null +++ b/protocols/dcutr/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "libp2p-dcutr" +edition = "2021" +rust-version = "1.56.1" +description = "Direct connection upgrade through relay" +version = "0.1.0" +authors = ["Max Inden "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +asynchronous-codec = "0.6" +bytes = "1" +either = "1.6.0" +futures = "0.3.1" +futures-timer = "3.0" +instant = "0.1.11" +libp2p-core = { version = "0.31", path = "../../core" } +libp2p-swarm = { version = "0.33", path = "../../swarm" } +log = "0.4" +prost = "0.7" +thiserror = "1.0" +unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } +void = "1" + +[build-dependencies] +prost-build = "0.7" + +[dev-dependencies] +env_logger = "0.8.3" +libp2p = { path = "../..", features = ["dcutr"] } +libp2p-identify = { path = "../identify" } +libp2p-ping = { path = "../ping" } +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-relay = { path = "../relay" } +libp2p-yamux = { path = "../../muxers/yamux" } +rand = "0.7" +structopt = "0.3.21" \ No newline at end of file diff --git a/protocols/dcutr/build.rs b/protocols/dcutr/build.rs new file mode 100644 index 00000000000..b159bb4c817 --- /dev/null +++ b/protocols/dcutr/build.rs @@ -0,0 +1,23 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +fn main() { + prost_build::compile_protos(&["src/message.proto"], &["src"]).unwrap(); +} diff --git a/protocols/dcutr/examples/client.rs b/protocols/dcutr/examples/client.rs new file mode 100644 index 00000000000..4da42198ec9 --- /dev/null +++ b/protocols/dcutr/examples/client.rs @@ -0,0 +1,275 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::executor::block_on; +use futures::future::FutureExt; +use futures::stream::StreamExt; +use libp2p::core::multiaddr::{Multiaddr, Protocol}; +use libp2p::core::transport::OrTransport; +use libp2p::core::upgrade; +use libp2p::dcutr; +use libp2p::dns::DnsConfig; +use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo}; +use libp2p::noise; +use libp2p::ping::{Ping, PingConfig, PingEvent}; +use libp2p::relay::v2::client::{self, Client}; +use libp2p::swarm::{SwarmBuilder, SwarmEvent}; +use libp2p::tcp::TcpConfig; +use libp2p::Transport; +use libp2p::{identity, NetworkBehaviour, PeerId}; +use log::info; +use std::convert::TryInto; +use std::error::Error; +use std::net::Ipv4Addr; +use std::str::FromStr; +use std::task::{Context, Poll}; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +#[structopt(name = "libp2p DCUtR client")] +struct Opts { + /// The mode (relay, client-listen, client-dial) + #[structopt(long)] + mode: Mode, + + /// Fixed value to generate deterministic peer id + #[structopt(long)] + secret_key_seed: u8, + + /// The listening address + #[structopt(long)] + relay_address: Multiaddr, + + /// Peer ID of the remote peer to hole punch to. + #[structopt(long)] + remote_peer_id: Option, +} + +#[derive(Debug, StructOpt)] +enum Mode { + Dial, + Listen, +} + +impl FromStr for Mode { + type Err = String; + fn from_str(mode: &str) -> Result { + match mode { + "dial" => Ok(Mode::Dial), + "listen" => Ok(Mode::Listen), + _ => Err("Expected either 'dial' or 'listen'".to_string()), + } + } +} + +fn main() -> Result<(), Box> { + env_logger::init(); + + let opts = Opts::from_args(); + + let local_key = generate_ed25519(opts.secret_key_seed); + let local_peer_id = PeerId::from(local_key.public()); + info!("Local peer id: {:?}", local_peer_id); + + let (relay_transport, client) = Client::new_transport_and_behaviour(local_peer_id); + + let noise_keys = noise::Keypair::::new() + .into_authentic(&local_key) + .expect("Signing libp2p-noise static DH keypair failed."); + + let transport = OrTransport::new( + relay_transport, + block_on(DnsConfig::system(TcpConfig::new().port_reuse(true))).unwrap(), + ) + .upgrade(upgrade::Version::V1) + .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(); + + #[derive(NetworkBehaviour)] + #[behaviour(out_event = "Event", event_process = false)] + struct Behaviour { + relay_client: Client, + ping: Ping, + identify: Identify, + dcutr: dcutr::behaviour::Behaviour, + } + + #[derive(Debug)] + enum Event { + Ping(PingEvent), + Identify(IdentifyEvent), + Relay(client::Event), + Dcutr(dcutr::behaviour::Event), + } + + impl From for Event { + fn from(e: PingEvent) -> Self { + Event::Ping(e) + } + } + + impl From for Event { + fn from(e: IdentifyEvent) -> Self { + Event::Identify(e) + } + } + + impl From for Event { + fn from(e: client::Event) -> Self { + Event::Relay(e) + } + } + + impl From for Event { + fn from(e: dcutr::behaviour::Event) -> Self { + Event::Dcutr(e) + } + } + + let behaviour = Behaviour { + relay_client: client, + ping: Ping::new(PingConfig::new()), + identify: Identify::new(IdentifyConfig::new( + "/TODO/0.0.1".to_string(), + local_key.public(), + )), + dcutr: dcutr::behaviour::Behaviour::new(), + }; + + let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id) + .dial_concurrency_factor(10_u8.try_into().unwrap()) + .build(); + + swarm + .listen_on( + Multiaddr::empty() + .with("0.0.0.0".parse::().unwrap().into()) + .with(Protocol::Tcp(0)), + ) + .unwrap(); + + // Wait to listen on localhost. + block_on(async { + let mut delay = futures_timer::Delay::new(std::time::Duration::from_secs(1)).fuse(); + loop { + futures::select! { + event = swarm.next() => { + match event.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { + info!("Listening on {:?}", address); + } + event => panic!("{:?}", event), + } + } + _ = delay => { + break; + } + } + } + }); + + match opts.mode { + Mode::Dial => { + swarm.dial(opts.relay_address.clone()).unwrap(); + } + Mode::Listen => { + swarm + .listen_on(opts.relay_address.clone().with(Protocol::P2pCircuit)) + .unwrap(); + } + } + + // Wait till connected to relay to learn external address. + block_on(async { + loop { + match swarm.next().await.unwrap() { + SwarmEvent::NewListenAddr { .. } => {} + SwarmEvent::Dialing { .. } => {} + SwarmEvent::ConnectionEstablished { .. } => {} + SwarmEvent::Behaviour(Event::Ping(_)) => {} + SwarmEvent::Behaviour(Event::Relay(_)) => {} + SwarmEvent::Behaviour(Event::Identify(IdentifyEvent::Sent { .. })) => {} + SwarmEvent::Behaviour(Event::Identify(IdentifyEvent::Received { + info: IdentifyInfo { observed_addr, .. }, + .. + })) => { + info!("Observed address: {:?}", observed_addr); + break; + } + event => panic!("{:?}", event), + } + } + }); + + if matches!(opts.mode, Mode::Dial) { + swarm + .dial( + opts.relay_address + .clone() + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(opts.remote_peer_id.unwrap().into())), + ) + .unwrap(); + } + + block_on(futures::future::poll_fn(move |cx: &mut Context<'_>| { + loop { + match swarm.poll_next_unpin(cx) { + Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => { + info!("Listening on {:?}", address); + } + Poll::Ready(Some(SwarmEvent::Behaviour(Event::Relay(event)))) => { + info!("{:?}", event) + } + Poll::Ready(Some(SwarmEvent::Behaviour(Event::Dcutr(event)))) => { + info!("{:?}", event) + } + Poll::Ready(Some(SwarmEvent::Behaviour(Event::Identify(event)))) => { + info!("{:?}", event) + } + Poll::Ready(Some(SwarmEvent::Behaviour(Event::Ping(_)))) => {} + Poll::Ready(Some(SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + })) => { + info!("Established connection to {:?} via {:?}", peer_id, endpoint); + } + Poll::Ready(Some(SwarmEvent::OutgoingConnectionError { peer_id, error })) => { + info!("Outgoing connection error to {:?}: {:?}", peer_id, error); + } + Poll::Ready(Some(_)) => {} + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => { + break; + } + } + } + Poll::Pending + })) +} + +fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair { + let mut bytes = [0u8; 32]; + bytes[0] = secret_key_seed; + + let secret_key = identity::ed25519::SecretKey::from_bytes(&mut bytes) + .expect("this returns `Err` only if the length is wrong; the length is correct; qed"); + identity::Keypair::Ed25519(secret_key.into()) +} diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs new file mode 100644 index 00000000000..b4b829a383c --- /dev/null +++ b/protocols/dcutr/src/behaviour.rs @@ -0,0 +1,400 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`NetworkBehaviour`] to act as a direct connection upgrade through relay node. + +use crate::handler; +use crate::protocol; +use either::Either; +use libp2p_core::connection::{ConnectedPoint, ConnectionId}; +use libp2p_core::multiaddr::Protocol; +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::dial_opts::{self, DialOpts}; +use libp2p_swarm::{ + DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, ProtocolsHandler, ProtocolsHandlerUpgrErr, +}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::task::{Context, Poll}; +use thiserror::Error; + +/// The events produced by the [`Behaviour`]. +#[derive(Debug)] +pub enum Event { + InitiateDirectConnectionUpgrade { + remote_peer_id: PeerId, + local_relayed_addr: Multiaddr, + }, + RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id: PeerId, + remote_relayed_addr: Multiaddr, + }, + DirectConnectionUpgradeSucceeded { + remote_peer_id: PeerId, + }, + DirectConnectionUpgradeFailed { + remote_peer_id: PeerId, + error: UpgradeError, + }, +} + +#[derive(Debug, Error)] +pub enum UpgradeError { + #[error("Failed to dial peer.")] + Dial, + #[error("Failed to establish substream: {0}.")] + Handler(ProtocolsHandlerUpgrErr), +} + +pub struct Behaviour { + /// Queue of actions to return when polled. + queued_actions: VecDeque, + + /// All direct (non-relayed) connections. + direct_connections: HashMap>, +} + +impl Behaviour { + pub fn new() -> Self { + Behaviour { + queued_actions: Default::default(), + direct_connections: Default::default(), + } + } +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = handler::Prototype; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + handler::Prototype::UnknownConnection + } + + fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { + vec![] + } + + fn inject_connected(&mut self, _peer_id: &PeerId) {} + + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + connection_id: &ConnectionId, + connected_point: &ConnectedPoint, + _failed_addresses: Option<&Vec>, + ) { + if connected_point.is_relayed() { + if connected_point.is_listener() && !self.direct_connections.contains_key(peer_id) { + // TODO: Try dialing the remote peer directly. Specification: + // + // > The protocol starts with the completion of a relay connection from A to B. Upon + // observing the new connection, the inbound peer (here B) checks the addresses + // advertised by A via identify. If that set includes public addresses, then A may + // be reachable by a direct connection, in which case B attempts a unilateral + // connection upgrade by initiating a direct connection to A. + // + // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol + self.queued_actions.push_back(Action::Connect { + peer_id: *peer_id, + attempt: 1, + handler: NotifyHandler::One(*connection_id), + }); + let local_addr = match connected_point { + ConnectedPoint::Listener { local_addr, .. } => local_addr, + ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), + }; + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::InitiateDirectConnectionUpgrade { + remote_peer_id: *peer_id, + local_relayed_addr: local_addr.clone(), + }) + .into(), + ); + } + } else { + self.direct_connections + .entry(*peer_id) + .or_default() + .insert(*connection_id); + } + } + + fn inject_dial_failure( + &mut self, + peer_id: Option, + handler: Self::ProtocolsHandler, + _error: &DialError, + ) { + match handler { + handler::Prototype::DirectConnection { + relayed_connection_id, + role: handler::Role::Initiator { attempt }, + } => { + let peer_id = + peer_id.expect("Prototype::DirectConnection is always for known peer."); + if attempt < 3 { + self.queued_actions.push_back(Action::Connect { + peer_id, + handler: NotifyHandler::One(relayed_connection_id), + attempt: attempt + 1, + }); + } else { + self.queued_actions.push_back( + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(relayed_connection_id), + event: Either::Left( + handler::relayed::Command::UpgradeFinishedDontKeepAlive, + ), + } + .into(), + ); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent( + Event::DirectConnectionUpgradeFailed { + remote_peer_id: peer_id, + error: UpgradeError::Dial, + }, + ) + .into(), + ); + } + } + _ => {} + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + assert!(!self.direct_connections.contains_key(peer_id)); + } + + fn inject_connection_closed( + &mut self, + peer_id: &PeerId, + connection_id: &ConnectionId, + connected_point: &ConnectedPoint, + _handler: <::ProtocolsHandler as IntoProtocolsHandler>::Handler, + ) { + if !connected_point.is_relayed() { + let connections = self + .direct_connections + .get_mut(peer_id) + .expect("Peer of direct connection to be tracked."); + connections + .remove(connection_id) + .then(|| ()) + .expect("Direct connection to be tracked."); + if connections.is_empty() { + self.direct_connections.remove(peer_id); + } + } + } + + fn inject_event( + &mut self, + event_source: PeerId, + connection: ConnectionId, + handler_event: <::Handler as ProtocolsHandler>::OutEvent, + ) { + match handler_event { + Either::Left(handler::relayed::Event::InboundConnectRequest { + inbound_connect, + remote_addr, + }) => { + self.queued_actions.push_back(Action::AcceptInboundConnect { + peer_id: event_source, + handler: NotifyHandler::One(connection), + inbound_connect, + }); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent( + Event::RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id: event_source, + remote_relayed_addr: remote_addr, + }, + ) + .into(), + ); + } + Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: event_source, + error: UpgradeError::Handler(error), + }) + .into(), + ); + } + Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { + self.queued_actions.push_back( + NetworkBehaviourAction::Dial { + opts: DialOpts::peer_id(event_source) + .addresses(remote_addrs) + .condition(dial_opts::PeerCondition::Always) + .build(), + handler: handler::Prototype::DirectConnection { + relayed_connection_id: connection, + role: handler::Role::Listener, + }, + } + .into(), + ); + } + Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: event_source, + error: UpgradeError::Handler(error), + }) + .into(), + ); + } + Either::Left(handler::relayed::Event::OutboundConnectNegotiated { + remote_addrs, + attempt, + }) => { + self.queued_actions.push_back( + NetworkBehaviourAction::Dial { + opts: DialOpts::peer_id(event_source) + .condition(dial_opts::PeerCondition::Always) + .addresses(remote_addrs) + .build(), + handler: handler::Prototype::DirectConnection { + relayed_connection_id: connection, + role: handler::Role::Initiator { attempt }, + }, + } + .into(), + ); + } + Either::Right(Either::Left( + handler::direct::Event::DirectConnectionUpgradeSucceeded { + relayed_connection_id, + }, + )) => { + self.queued_actions.push_back( + NetworkBehaviourAction::NotifyHandler { + peer_id: event_source, + handler: NotifyHandler::One(relayed_connection_id), + event: Either::Left( + handler::relayed::Command::UpgradeFinishedDontKeepAlive, + ), + } + .into(), + ); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent( + Event::DirectConnectionUpgradeSucceeded { + remote_peer_id: event_source, + }, + ) + .into(), + ); + return; + } + Either::Right(Either::Right(event)) => void::unreachable(event), + }; + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + poll_parameters: &mut impl PollParameters, + ) -> Poll> { + if let Some(action) = self.queued_actions.pop_front() { + return Poll::Ready(action.build(poll_parameters)); + } + + Poll::Pending + } +} + +/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] +/// before being returned in [`Relay::poll`]. +#[allow(clippy::large_enum_variant)] +enum Action { + Done(NetworkBehaviourAction), + Connect { + attempt: u8, + handler: NotifyHandler, + peer_id: PeerId, + }, + AcceptInboundConnect { + inbound_connect: protocol::inbound::PendingConnect, + handler: NotifyHandler, + peer_id: PeerId, + }, +} + +impl From> for Action { + fn from(action: NetworkBehaviourAction) -> Self { + Self::Done(action) + } +} + +impl Action { + fn build( + self, + poll_parameters: &mut impl PollParameters, + ) -> NetworkBehaviourAction { + match self { + Action::Done(action) => action, + Action::AcceptInboundConnect { + inbound_connect, + handler, + peer_id, + } => NetworkBehaviourAction::NotifyHandler { + handler, + peer_id, + event: Either::Left(handler::relayed::Command::AcceptInboundConnect { + inbound_connect, + obs_addrs: poll_parameters + .external_addresses() + .filter(|a| !a.addr.iter().any(|p| p == Protocol::P2pCircuit)) + .map(|a| { + a.addr + .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) + }) + .collect(), + }), + }, + Action::Connect { + attempt, + handler, + peer_id, + } => NetworkBehaviourAction::NotifyHandler { + handler, + peer_id, + event: Either::Left(handler::relayed::Command::Connect { + attempt, + obs_addrs: poll_parameters + .external_addresses() + .filter(|a| !a.addr.iter().any(|p| p == Protocol::P2pCircuit)) + .map(|a| { + a.addr + .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) + }) + .collect(), + }), + }, + } + } +} diff --git a/protocols/dcutr/src/handler.rs b/protocols/dcutr/src/handler.rs new file mode 100644 index 00000000000..4dcc0f4d66d --- /dev/null +++ b/protocols/dcutr/src/handler.rs @@ -0,0 +1,81 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::protocol; +use either::Either; +use libp2p_core::connection::ConnectionId; +use libp2p_core::upgrade::{self, DeniedUpgrade}; +use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_swarm::protocols_handler::DummyProtocolsHandler; +use libp2p_swarm::protocols_handler::SendWrapper; +use libp2p_swarm::{IntoProtocolsHandler, ProtocolsHandler}; + +pub mod direct; +pub mod relayed; + +pub enum Prototype { + DirectConnection { + role: Role, + relayed_connection_id: ConnectionId, + }, + UnknownConnection, +} + +pub enum Role { + Initiator { attempt: u8 }, + Listener, +} + +impl IntoProtocolsHandler for Prototype { + type Handler = Either>; + + fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { + match self { + Self::UnknownConnection => { + if endpoint.is_relayed() { + Either::Left(relayed::Handler::new(endpoint.clone())) + } else { + Either::Right(Either::Right(DummyProtocolsHandler::default())) + } + } + Self::DirectConnection { + relayed_connection_id, + .. + } => { + assert!( + !endpoint.is_relayed(), + "`Prototype::DirectConnection` is never created for relayed connection." + ); + Either::Right(Either::Left(direct::Handler::new(relayed_connection_id))) + } + } + } + + fn inbound_protocol(&self) -> ::InboundProtocol { + match self { + Prototype::UnknownConnection => upgrade::EitherUpgrade::A(SendWrapper( + upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), + )), + Prototype::DirectConnection { .. } => { + upgrade::EitherUpgrade::A(SendWrapper(upgrade::EitherUpgrade::B(DeniedUpgrade))) + } + } + } +} diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs new file mode 100644 index 00000000000..980e6e7f462 --- /dev/null +++ b/protocols/dcutr/src/handler/direct.rs @@ -0,0 +1,114 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`ProtocolsHandler`] handling direct connection upgraded through a relayed connection. + +use libp2p_core::connection::ConnectionId; +use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p_swarm::{ + KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use std::task::{Context, Poll}; +use void::Void; + +#[derive(Debug)] +pub enum Event { + DirectConnectionUpgradeSucceeded { relayed_connection_id: ConnectionId }, +} + +pub struct Handler { + relayed_connection_id: ConnectionId, + reported: bool, +} + +impl Handler { + pub(crate) fn new(relayed_connection_id: ConnectionId) -> Self { + Self { + reported: false, + relayed_connection_id, + } + } +} + +impl ProtocolsHandler for Handler { + type InEvent = void::Void; + type OutEvent = Event; + type Error = ProtocolsHandlerUpgrErr; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type OutboundOpenInfo = Void; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn inject_fully_negotiated_inbound( + &mut self, + _: >::Output, + _: Self::InboundOpenInfo, + ) { + } + + fn inject_fully_negotiated_outbound( + &mut self, + _: >::Output, + _: Self::OutboundOpenInfo, + ) { + } + + fn inject_event(&mut self, _: Self::InEvent) {} + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + _: ProtocolsHandlerUpgrErr< + >::Error, + >, + ) { + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::No + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + if !self.reported { + self.reported = true; + return Poll::Ready(ProtocolsHandlerEvent::Custom( + Event::DirectConnectionUpgradeSucceeded { + relayed_connection_id: self.relayed_connection_id, + }, + )); + } + Poll::Pending + } +} diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs new file mode 100644 index 00000000000..1f3e05c597c --- /dev/null +++ b/protocols/dcutr/src/handler/relayed.rs @@ -0,0 +1,380 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`ProtocolsHandler`] handling relayed connection potentially upgraded to a direct connection. + +use crate::protocol; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::{FuturesUnordered, StreamExt}; +use instant::Instant; +use libp2p_core::either::{EitherError, EitherOutput}; +use libp2p_core::multiaddr::Multiaddr; +use libp2p_core::upgrade::{self, DeniedUpgrade, NegotiationError, UpgradeError}; +use libp2p_core::ConnectedPoint; +use libp2p_swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend}; +use libp2p_swarm::{ + KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use std::collections::VecDeque; +use std::fmt; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub enum Command { + Connect { + obs_addrs: Vec, + attempt: u8, + }, + AcceptInboundConnect { + obs_addrs: Vec, + inbound_connect: protocol::inbound::PendingConnect, + }, + /// Upgrading the relayed connection to a direct connection either failed for good or succeeded. + /// There is no need to keep the relayed connection alive for the sake of upgrading to a direct + /// connection. + UpgradeFinishedDontKeepAlive, +} + +impl fmt::Debug for Command { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Command::Connect { obs_addrs, attempt } => f + .debug_struct("Command::Connect") + .field("obs_addrs", obs_addrs) + .field("attempt", attempt) + .finish(), + Command::AcceptInboundConnect { + obs_addrs, + inbound_connect: _, + } => f + .debug_struct("Command::AcceptInboundConnect") + .field("obs_addrs", obs_addrs) + .finish(), + Command::UpgradeFinishedDontKeepAlive => f + .debug_struct("Command::UpgradeFinishedDontKeepAlive") + .finish(), + } + } +} + +pub enum Event { + InboundConnectRequest { + inbound_connect: protocol::inbound::PendingConnect, + remote_addr: Multiaddr, + }, + InboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr, + }, + InboundConnectNegotiated(Vec), + OutboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr, + }, + OutboundConnectNegotiated { + remote_addrs: Vec, + attempt: u8, + }, +} + +impl fmt::Debug for Event { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Event::InboundConnectRequest { + inbound_connect: _, + remote_addr, + } => f + .debug_struct("Event::InboundConnectRequest") + .field("remote_addrs", remote_addr) + .finish(), + Event::InboundNegotiationFailed { error } => f + .debug_struct("Event::InboundNegotiationFailed") + .field("error", error) + .finish(), + Event::InboundConnectNegotiated(addrs) => f + .debug_tuple("Event::InboundConnectNegotiated") + .field(addrs) + .finish(), + Event::OutboundNegotiationFailed { error } => f + .debug_struct("Event::OutboundNegotiationFailed") + .field("error", error) + .finish(), + Event::OutboundConnectNegotiated { + remote_addrs, + attempt, + } => f + .debug_struct("Event::OutboundConnectNegotiated") + .field("remote_addrs", remote_addrs) + .field("attempt", attempt) + .finish(), + } + } +} + +pub struct Handler { + endpoint: ConnectedPoint, + /// A pending fatal error that results in the connection being closed. + pending_error: Option< + ProtocolsHandlerUpgrErr< + EitherError, + >, + >, + /// Queue of events to return when polled. + queued_events: VecDeque< + ProtocolsHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::OutEvent, + ::Error, + >, + >, + /// Inbound connects, accepted by the behaviour, pending completion. + inbound_connects: FuturesUnordered< + BoxFuture<'static, Result, protocol::inbound::UpgradeError>>, + >, + keep_alive: KeepAlive, +} + +impl Handler { + pub fn new(endpoint: ConnectedPoint) -> Self { + Self { + endpoint, + pending_error: Default::default(), + queued_events: Default::default(), + inbound_connects: Default::default(), + keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(30)), + } + } +} + +impl ProtocolsHandler for Handler { + type InEvent = Command; + type OutEvent = Event; + type Error = ProtocolsHandlerUpgrErr< + EitherError, + >; + type InboundProtocol = upgrade::EitherUpgrade; + type OutboundProtocol = protocol::outbound::Upgrade; + type OutboundOpenInfo = u8; // Number of upgrade attempts. + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + match self.endpoint { + ConnectedPoint::Dialer { .. } => { + SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ()) + } + ConnectedPoint::Listener { .. } => { + SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ()) + } + } + } + + fn inject_fully_negotiated_inbound( + &mut self, + output: >::Output, + _: Self::InboundOpenInfo, + ) { + match output { + EitherOutput::First(inbound_connect) => { + let remote_addr = match & self.endpoint { + ConnectedPoint::Dialer { address } => address.clone(), + ConnectedPoint::Listener { ..} => unreachable!("`::listen_protocol` denies all incoming substreams as a listener."), + }; + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundConnectRequest { + inbound_connect, + remote_addr, + }, + )); + } + // A connection listener denies all incoming substreams, thus none can ever be fully negotiated. + EitherOutput::Second(output) => void::unreachable(output), + } + } + + fn inject_fully_negotiated_outbound( + &mut self, + protocol::outbound::Connect { obs_addrs }: >::Output, + attempt: Self::OutboundOpenInfo, + ) { + assert!( + self.endpoint.is_listener(), + "A connection dialer never initiates a connection upgrade." + ); + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::OutboundConnectNegotiated { + remote_addrs: obs_addrs, + attempt, + }, + )); + } + + fn inject_event(&mut self, event: Self::InEvent) { + match event { + Command::Connect { obs_addrs, attempt } => { + self.queued_events + .push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + protocol::outbound::Upgrade::new(obs_addrs), + attempt, + ), + }); + } + Command::AcceptInboundConnect { + inbound_connect, + obs_addrs, + } => { + self.inbound_connects + .push(inbound_connect.accept(obs_addrs).boxed()); + } + Command::UpgradeFinishedDontKeepAlive => { + self.keep_alive = KeepAlive::No; + } + } + } + + fn inject_listen_upgrade_error( + &mut self, + _: Self::InboundOpenInfo, + error: ProtocolsHandlerUpgrErr<::Error>, + ) { + self.keep_alive = KeepAlive::No; + + match error { + ProtocolsHandlerUpgrErr::Timeout => { + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Timeout, + }, + )); + } + ProtocolsHandlerUpgrErr::Timer => { + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Timer, + }, + )); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The remote merely doesn't support the DCUtR protocol. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::Failed, + )), + }, + )); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error.map_upgrade_err(|e| { + e.map_err(|e| match e { + EitherError::A(e) => EitherError::A(e), + EitherError::B(v) => void::unreachable(v), + }) + })); + } + } + } + + fn inject_dial_upgrade_error( + &mut self, + _open_info: Self::OutboundOpenInfo, + error: ProtocolsHandlerUpgrErr<::Error>, + ) { + self.keep_alive = KeepAlive::No; + + match error { + ProtocolsHandlerUpgrErr::Timeout => { + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::OutboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Timeout, + }, + )); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The remote merely doesn't support the DCUtR protocol. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::OutboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::Failed, + )), + }, + )); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = + Some(error.map_upgrade_err(|e| e.map_err(|e| EitherError::B(e)))); + } + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + // Check for a pending (fatal) error. + if let Some(err) = self.pending_error.take() { + // The handler will not be polled again by the `Swarm`. + return Poll::Ready(ProtocolsHandlerEvent::Close(err)); + } + + // Return queued events. + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + while let Poll::Ready(Some(result)) = self.inbound_connects.poll_next_unpin(cx) { + match result { + Ok(addresses) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom( + Event::InboundConnectNegotiated(addresses), + )); + } + Err(e) => { + return Poll::Ready(ProtocolsHandlerEvent::Close( + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))), + )) + } + } + } + + Poll::Pending + } +} diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs new file mode 100644 index 00000000000..1c8b525e95f --- /dev/null +++ b/protocols/dcutr/src/lib.rs @@ -0,0 +1,30 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the [libp2p direct connection upgrade through relay +//! specification](https://github.com/libp2p/specs/pull/173). + +pub mod behaviour; +mod handler; +mod protocol; + +mod message_proto { + include!(concat!(env!("OUT_DIR"), "/holepunch.pb.rs")); +} diff --git a/protocols/dcutr/src/message.proto b/protocols/dcutr/src/message.proto new file mode 100644 index 00000000000..ab5b220f2ea --- /dev/null +++ b/protocols/dcutr/src/message.proto @@ -0,0 +1,19 @@ +syntax = "proto2"; + +package holepunch.pb; + +message HolePunch { + enum Type { + CONNECT = 100; + SYNC = 300; + } + + required Type type=1; + + // For hole punching, we'll send some additional observed addresses to the remote peer + // that could have been filtered by the Host address factory (for example: AutoRelay removes all public addresses if peer has private reachability). + // This is a hack! + // We plan to have a better address discovery and advertisement mechanism in the future. + // See https://github.com/libp2p/go-libp2p-autonat/pull/98 + repeated bytes ObsAddrs = 2; +} diff --git a/protocols/dcutr/src/protocol.rs b/protocols/dcutr/src/protocol.rs new file mode 100644 index 00000000000..d2b8b39a6d0 --- /dev/null +++ b/protocols/dcutr/src/protocol.rs @@ -0,0 +1,26 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +pub mod inbound; +pub mod outbound; + +const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr"; + +const MAX_MESSAGE_SIZE_BYTES: usize = 4096; diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs new file mode 100644 index 00000000000..5b03f01491d --- /dev/null +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -0,0 +1,151 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::message_proto::{hole_punch, HolePunch}; +use asynchronous_codec::Framed; +use bytes::BytesMut; +use futures::{future::BoxFuture, prelude::*}; +use libp2p_core::{upgrade, Multiaddr}; +use libp2p_swarm::NegotiatedSubstream; +use prost::Message; +use std::convert::TryFrom; +use std::io::Cursor; +use std::iter; +use thiserror::Error; +use unsigned_varint::codec::UviBytes; + +pub struct Upgrade {} + +impl upgrade::UpgradeInfo for Upgrade { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(super::PROTOCOL_NAME) + } +} + +impl upgrade::InboundUpgrade for Upgrade { + type Output = PendingConnect; + type Error = UpgradeError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + let mut codec = UviBytes::default(); + codec.set_max_len(super::MAX_MESSAGE_SIZE_BYTES); + let mut substream = Framed::new(substream, codec); + + async move { + let msg: bytes::BytesMut = substream + .next() + .await + .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; + + let HolePunch { r#type, obs_addrs } = HolePunch::decode(Cursor::new(msg))?; + + let obs_addrs = if obs_addrs.is_empty() { + return Err(UpgradeError::NoAddresses); + } else { + obs_addrs + .into_iter() + .map(TryFrom::try_from) + .collect::, _>>() + .map_err(|_| UpgradeError::InvalidAddrs)? + }; + + let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + + match r#type { + hole_punch::Type::Connect => {} + hole_punch::Type::Sync => return Err(UpgradeError::UnexpectedTypeSync), + } + + Ok(PendingConnect { + substream, + remote_obs_addrs: obs_addrs, + }) + } + .boxed() + } +} + +pub struct PendingConnect { + substream: Framed, + remote_obs_addrs: Vec, +} + +impl PendingConnect { + pub async fn accept( + mut self, + local_obs_addrs: Vec, + ) -> Result, UpgradeError> { + let msg = HolePunch { + r#type: hole_punch::Type::Connect.into(), + obs_addrs: local_obs_addrs.into_iter().map(|a| a.to_vec()).collect(), + }; + + let mut encoded_msg = BytesMut::new(); + msg.encode(&mut encoded_msg) + .expect("BytesMut to have sufficient capacity."); + + self.substream.send(encoded_msg.freeze()).await?; + let msg: bytes::BytesMut = self + .substream + .next() + .await + .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; + + let HolePunch { r#type, .. } = HolePunch::decode(Cursor::new(msg))?; + + let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + match r#type { + hole_punch::Type::Connect => return Err(UpgradeError::UnexpectedTypeConnect), + hole_punch::Type::Sync => {} + } + + Ok(self.remote_obs_addrs) + } +} + +#[derive(Debug, Error)] +pub enum UpgradeError { + #[error("Failed to decode response: {0}.")] + Decode( + #[from] + #[source] + prost::DecodeError, + ), + #[error("Io error {0}")] + Io( + #[from] + #[source] + std::io::Error, + ), + #[error("Expected at least one address in reservation.")] + NoAddresses, + #[error("Invalid addresses.")] + InvalidAddrs, + #[error("Failed to parse response type field.")] + ParseTypeField, + #[error("Unexpected message type 'connect'")] + UnexpectedTypeConnect, + #[error("Unexpected message type 'sync'")] + UnexpectedTypeSync, +} diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs new file mode 100644 index 00000000000..5a9f96ca28b --- /dev/null +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -0,0 +1,159 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::message_proto::{hole_punch, HolePunch}; +use asynchronous_codec::Framed; +use bytes::BytesMut; +use futures::{future::BoxFuture, prelude::*}; +use futures_timer::Delay; +use libp2p_core::{upgrade, Multiaddr}; +use libp2p_swarm::NegotiatedSubstream; +use prost::Message; +use std::convert::TryFrom; +use std::io::Cursor; +use std::iter; +use std::time::Instant; +use thiserror::Error; +use unsigned_varint::codec::UviBytes; + +pub struct Upgrade { + obs_addrs: Vec, +} + +impl upgrade::UpgradeInfo for Upgrade { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(super::PROTOCOL_NAME) + } +} + +impl Upgrade { + pub fn new(obs_addrs: Vec) -> Self { + Self { obs_addrs } + } +} + +impl upgrade::OutboundUpgrade for Upgrade { + type Output = Connect; + type Error = UpgradeError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + let msg = HolePunch { + r#type: hole_punch::Type::Connect.into(), + obs_addrs: self.obs_addrs.into_iter().map(|a| a.to_vec()).collect(), + }; + + let mut encoded_msg = BytesMut::new(); + msg.encode(&mut encoded_msg) + .expect("BytesMut to have sufficient capacity."); + + let mut codec = UviBytes::default(); + codec.set_max_len(super::MAX_MESSAGE_SIZE_BYTES); + let mut substream = Framed::new(substream, codec); + + async move { + substream.send(encoded_msg.freeze()).await?; + + let sent_time = Instant::now(); + + let msg: bytes::BytesMut = substream + .next() + .await + .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; + + let rtt = sent_time.elapsed(); + + let HolePunch { r#type, obs_addrs } = HolePunch::decode(Cursor::new(msg))?; + + let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + match r#type { + hole_punch::Type::Connect => {} + hole_punch::Type::Sync => return Err(UpgradeError::UnexpectedTypeSync), + } + + let obs_addrs = if obs_addrs.is_empty() { + return Err(UpgradeError::NoAddresses); + } else { + obs_addrs + .into_iter() + .map(TryFrom::try_from) + .collect::, _>>() + .map_err(|_| UpgradeError::InvalidAddrs)? + }; + + let msg = HolePunch { + r#type: hole_punch::Type::Sync.into(), + obs_addrs: vec![], + }; + + let mut encoded_msg = BytesMut::new(); + msg.encode(&mut encoded_msg) + .expect("BytesMut to have sufficient capacity."); + + substream.send(encoded_msg.freeze()).await?; + + Delay::new(rtt / 2).await; + + Ok(Connect { obs_addrs }) + } + .boxed() + } +} + +pub struct Connect { + pub obs_addrs: Vec, +} + +#[derive(Debug, Error)] +pub enum UpgradeError { + #[error("Failed to decode response: {0}.")] + Decode( + #[from] + #[source] + prost::DecodeError, + ), + #[error("Io error {0}")] + Io( + #[from] + #[source] + std::io::Error, + ), + #[error("Expected 'status' field to be set.")] + MissingStatusField, + #[error("Expected 'reservation' field to be set.")] + MissingReservationField, + #[error("Expected at least one address in reservation.")] + NoAddresses, + #[error("Invalid expiration timestamp in reservation.")] + InvalidReservationExpiration, + #[error("Invalid addresses in reservation.")] + InvalidAddrs, + #[error("Failed to parse response type field.")] + ParseTypeField, + #[error("Unexpected message type 'connect'")] + UnexpectedTypeConnect, + #[error("Unexpected message type 'sync'")] + UnexpectedTypeSync, + #[error("Failed to parse response type field.")] + ParseStatusField, +} diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs new file mode 100644 index 00000000000..0614e7bc49c --- /dev/null +++ b/protocols/dcutr/tests/lib.rs @@ -0,0 +1,285 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::executor::LocalPool; +use futures::future::FutureExt; +use futures::io::{AsyncRead, AsyncWrite}; +use futures::stream::StreamExt; +use futures::task::Spawn; +use libp2p::core::multiaddr::{Multiaddr, Protocol}; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::upgrade::Version; +use libp2p::core::transport::{Boxed, MemoryTransport, OrTransport, Transport}; +use libp2p::core::PublicKey; +use libp2p::core::{identity, PeerId}; +use libp2p::dcutr; +use libp2p::ping::{Ping, PingConfig, PingEvent}; +use libp2p::plaintext::PlainText2Config; +use libp2p::relay::v2::client; +use libp2p::relay::v2::relay; +use libp2p::NetworkBehaviour; +use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; +use std::time::Duration; + +#[test] +fn connect() { + let _ = env_logger::try_init(); + let mut pool = LocalPool::new(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay(); + let relay_peer_id = *relay.local_peer_id(); + + relay.listen_on(relay_addr.clone()).unwrap(); + relay.add_external_address(relay_addr.clone(), AddressScore::Infinite); + spawn_swarm_on_pool(&pool, relay); + + let mut dst = build_client(); + let dst_peer_id = *dst.local_peer_id(); + let dst_relayed_addr = relay_addr + .clone() + .with(Protocol::P2p(relay_peer_id.into())) + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(dst_peer_id.into())); + let dst_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + + dst.listen_on(dst_relayed_addr.clone()).unwrap(); + dst.listen_on(dst_addr.clone()).unwrap(); + dst.add_external_address(dst_addr.clone(), AddressScore::Infinite); + + pool.run_until(wait_for_reservation( + &mut dst, + dst_relayed_addr.clone(), + relay_peer_id, + false, // No renewal. + )); + spawn_swarm_on_pool(&pool, dst); + + let mut src = build_client(); + let src_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + src.listen_on(src_addr.clone()).unwrap(); + pool.run_until(wait_for_new_listen_addr(&mut src, &src_addr)); + src.add_external_address(src_addr.clone(), AddressScore::Infinite); + + src.dial(dst_relayed_addr.clone()).unwrap(); + + pool.run_until(wait_for_connection_established(&mut src, &dst_relayed_addr)); + match pool.run_until(wait_for_dcutr_event(&mut src)) { + dcutr::behaviour::Event::RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id, + remote_relayed_addr, + } if remote_peer_id == dst_peer_id && remote_relayed_addr == dst_relayed_addr => {} + e => panic!("Unexpected event: {:?}.", e), + } + pool.run_until(wait_for_connection_established( + &mut src, + &dst_addr.with(Protocol::P2p(dst_peer_id.into())), + )); +} + +fn build_relay() -> Swarm { + let local_key = identity::Keypair::generate_ed25519(); + let local_public_key = local_key.public(); + let local_peer_id = local_public_key.clone().to_peer_id(); + + let transport = build_transport(MemoryTransport::default().boxed(), local_public_key); + + Swarm::new( + transport, + Relay { + ping: Ping::new(PingConfig::new()), + relay: relay::Relay::new( + local_peer_id, + relay::Config { + reservation_duration: Duration::from_secs(2), + ..Default::default() + }, + ), + }, + local_peer_id, + ) +} + +fn build_client() -> Swarm { + let local_key = identity::Keypair::generate_ed25519(); + let local_public_key = local_key.public(); + let local_peer_id = local_public_key.clone().to_peer_id(); + + let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour(local_peer_id); + let transport = build_transport( + OrTransport::new(relay_transport, MemoryTransport::default()).boxed(), + local_public_key, + ); + + Swarm::new( + transport, + Client { + ping: Ping::new(PingConfig::new()), + relay: behaviour, + dcutr: dcutr::behaviour::Behaviour::new(), + }, + local_peer_id, + ) +} + +fn build_transport( + transport: Boxed, + local_public_key: PublicKey, +) -> Boxed<(PeerId, StreamMuxerBox)> +where + StreamSink: AsyncRead + AsyncWrite + Send + Unpin + 'static, +{ + let transport = transport + .upgrade(Version::V1) + .authenticate(PlainText2Config { local_public_key }) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(); + + transport +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "RelayEvent", event_process = false)] +struct Relay { + relay: relay::Relay, + ping: Ping, +} + +#[derive(Debug)] +enum RelayEvent { + Relay(relay::Event), + Ping(PingEvent), +} + +impl From for RelayEvent { + fn from(event: relay::Event) -> Self { + RelayEvent::Relay(event) + } +} + +impl From for RelayEvent { + fn from(event: PingEvent) -> Self { + RelayEvent::Ping(event) + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "ClientEvent", event_process = false)] +struct Client { + relay: client::Client, + ping: Ping, + dcutr: dcutr::behaviour::Behaviour, +} + +#[derive(Debug)] +enum ClientEvent { + Relay(client::Event), + Ping(PingEvent), + Dcutr(dcutr::behaviour::Event), +} + +impl From for ClientEvent { + fn from(event: client::Event) -> Self { + ClientEvent::Relay(event) + } +} + +impl From for ClientEvent { + fn from(event: PingEvent) -> Self { + ClientEvent::Ping(event) + } +} + +impl From for ClientEvent { + fn from(event: dcutr::behaviour::Event) -> Self { + ClientEvent::Dcutr(event) + } +} + +fn spawn_swarm_on_pool(pool: &LocalPool, swarm: Swarm) { + pool.spawner() + .spawn_obj(swarm.collect::>().map(|_| ()).boxed().into()) + .unwrap(); +} + +async fn wait_for_reservation( + client: &mut Swarm, + client_addr: Multiaddr, + relay_peer_id: PeerId, + is_renewal: bool, +) { + loop { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } if address != client_addr => {} + SwarmEvent::Behaviour(ClientEvent::Relay(client::Event::ReservationReqAccepted { + relay_peer_id: peer_id, + renewal, + .. + })) if relay_peer_id == peer_id && renewal == is_renewal => break, + SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} + SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} + SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} + e => panic!("{:?}", e), + } + } + + // Wait for `NewListenAddr` event. + match client.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } if address == client_addr => {} + e => panic!("{:?}", e), + } +} + +async fn wait_for_connection_established(client: &mut Swarm, addr: &Multiaddr) { + loop { + match client.select_next_some().await { + SwarmEvent::IncomingConnection { .. } => {} + SwarmEvent::ConnectionEstablished { endpoint, .. } + if endpoint.get_remote_address() == addr => + { + break + } + SwarmEvent::Dialing(_) => {} + SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} + SwarmEvent::Behaviour(ClientEvent::Relay( + client::Event::OutboundCircuitEstablished { .. }, + )) => {} + SwarmEvent::ConnectionEstablished { .. } => {} + e => panic!("{:?}", e), + } + } +} + +async fn wait_for_new_listen_addr(client: &mut Swarm, new_addr: &Multiaddr) { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } if address == *new_addr => {} + e => panic!("{:?}", e), + } +} + +async fn wait_for_dcutr_event(client: &mut Swarm) -> dcutr::behaviour::Event { + loop { + match client.select_next_some().await { + SwarmEvent::Behaviour(ClientEvent::Dcutr(e)) => return e, + SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} + e => panic!("{:?}", e), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index b913a94121e..dbfe64cb43a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,10 @@ pub use multiaddr; pub use libp2p_autonat as autonat; #[doc(inline)] pub use libp2p_core as core; +#[cfg(feature = "dcutr")] +#[cfg_attr(docsrs, doc(cfg(feature = "dcutr")))] +#[doc(inline)] +pub use libp2p_dcutr as dcutr; #[cfg(feature = "deflate")] #[cfg_attr(docsrs, doc(cfg(feature = "deflate")))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] From e6551aa9b904c4525aba2aa912491d9967ac2d38 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 15 Jan 2022 21:35:46 +0100 Subject: [PATCH 02/22] misc/metrics: Run rust fmt --- misc/metrics/src/dcutr.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index 1632a68996d..fd18f3b3c22 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -66,9 +66,9 @@ impl From<&libp2p_dcutr::behaviour::Event> for EventType { remote_peer_id: _, remote_relayed_addr: _, } => EventType::RemoteInitiatedDirectConnectionUpgrade, - libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeSucceeded { remote_peer_id: _ } => { - EventType::DirectConnectionUpgradeSucceeded - } + libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeSucceeded { + remote_peer_id: _, + } => EventType::DirectConnectionUpgradeSucceeded, libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeFailed { remote_peer_id: _, error: _, From b1cb5886cf28975f2ff803e2fbd96255af14b9ef Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 15 Jan 2022 21:37:06 +0100 Subject: [PATCH 03/22] protocols/dcutr: Reference merged DCUtR specification --- protocols/dcutr/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 1c8b525e95f..5451f272101 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -18,8 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Implementation of the [libp2p direct connection upgrade through relay -//! specification](https://github.com/libp2p/specs/pull/173). +//! Implementation of the [libp2p Direct Connection Upgrade through Relay +//! specification](https://github.com/libp2p/specs/blob/master/relay/DCUtR.md). pub mod behaviour; mod handler; From 436d5659a347a81be69732410fe7b993b9e7b018 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 17 Jan 2022 16:44:18 +0100 Subject: [PATCH 04/22] protocols/dcutr: Dial as_listener when DCUtR server --- protocols/dcutr/src/behaviour.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index b4b829a383c..22f16d86fdd 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -276,6 +276,7 @@ impl NetworkBehaviour for Behaviour { opts: DialOpts::peer_id(event_source) .condition(dial_opts::PeerCondition::Always) .addresses(remote_addrs) + .override_role() .build(), handler: handler::Prototype::DirectConnection { relayed_connection_id: connection, From 7bc75a5216a32bf211d095a81a04cfba463df1ea Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 19 Jan 2022 15:47:01 +0100 Subject: [PATCH 05/22] protocols/dcutr: Filter relayed addresses on protocol level --- protocols/dcutr/src/protocol/inbound.rs | 9 +++++++-- protocols/dcutr/src/protocol/outbound.rs | 9 +++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index 5b03f01491d..5e7bfcc0261 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -22,7 +22,7 @@ use crate::message_proto::{hole_punch, HolePunch}; use asynchronous_codec::Framed; use bytes::BytesMut; use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::{upgrade, Multiaddr}; +use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; use libp2p_swarm::NegotiatedSubstream; use prost::Message; use std::convert::TryFrom; @@ -65,7 +65,12 @@ impl upgrade::InboundUpgrade for Upgrade { } else { obs_addrs .into_iter() - .map(TryFrom::try_from) + .map(Multiaddr::try_from) + // Filter out relayed addresses. + .filter(|a| match a { + Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit), + Err(_) => true, + }) .collect::, _>>() .map_err(|_| UpgradeError::InvalidAddrs)? }; diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index 5a9f96ca28b..7717c8e228b 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -23,7 +23,7 @@ use asynchronous_codec::Framed; use bytes::BytesMut; use futures::{future::BoxFuture, prelude::*}; use futures_timer::Delay; -use libp2p_core::{upgrade, Multiaddr}; +use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; use libp2p_swarm::NegotiatedSubstream; use prost::Message; use std::convert::TryFrom; @@ -96,7 +96,12 @@ impl upgrade::OutboundUpgrade for Upgrade { } else { obs_addrs .into_iter() - .map(TryFrom::try_from) + .map(Multiaddr::try_from) + // Filter out relayed addresses. + .filter(|a| match a { + Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit), + Err(_) => true, + }) .collect::, _>>() .map_err(|_| UpgradeError::InvalidAddrs)? }; From bafef6943b2aa9643be6ccd6425e6bd727235087 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 19 Jan 2022 15:49:56 +0100 Subject: [PATCH 06/22] protocols/dcutr: Fix intra-doc link --- protocols/dcutr/src/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 22f16d86fdd..669a9fec168 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -329,7 +329,7 @@ impl NetworkBehaviour for Behaviour { } /// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] -/// before being returned in [`Relay::poll`]. +/// before being returned in [`Behaviour::poll`]. #[allow(clippy::large_enum_variant)] enum Action { Done(NetworkBehaviourAction), From 4a5461c0ba29410dce05fe957290abf3b3dcb0d2 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 1 Feb 2022 19:59:49 +0100 Subject: [PATCH 07/22] protocols/dcutr/tests: Remove unused libp2p-ping --- protocols/dcutr/Cargo.toml | 1 - protocols/dcutr/tests/lib.rs | 57 +++++------------------------------- 2 files changed, 8 insertions(+), 50 deletions(-) diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index 8bc45ad1f1d..af2991920c3 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -32,7 +32,6 @@ prost-build = "0.7" env_logger = "0.8.3" libp2p = { path = "../..", features = ["dcutr"] } libp2p-identify = { path = "../identify" } -libp2p-ping = { path = "../ping" } libp2p-plaintext = { path = "../../transports/plaintext" } libp2p-relay = { path = "../relay" } libp2p-yamux = { path = "../../muxers/yamux" } diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 0614e7bc49c..64f41e5a831 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -30,7 +30,6 @@ use libp2p::core::transport::{Boxed, MemoryTransport, OrTransport, Transport}; use libp2p::core::PublicKey; use libp2p::core::{identity, PeerId}; use libp2p::dcutr; -use libp2p::ping::{Ping, PingConfig, PingEvent}; use libp2p::plaintext::PlainText2Config; use libp2p::relay::v2::client; use libp2p::relay::v2::relay; @@ -94,7 +93,7 @@ fn connect() { )); } -fn build_relay() -> Swarm { +fn build_relay() -> Swarm { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); let local_peer_id = local_public_key.clone().to_peer_id(); @@ -103,16 +102,13 @@ fn build_relay() -> Swarm { Swarm::new( transport, - Relay { - ping: Ping::new(PingConfig::new()), - relay: relay::Relay::new( - local_peer_id, - relay::Config { - reservation_duration: Duration::from_secs(2), - ..Default::default() - }, - ), - }, + relay::Relay::new( + local_peer_id, + relay::Config { + reservation_duration: Duration::from_secs(2), + ..Default::default() + }, + ), local_peer_id, ) } @@ -131,7 +127,6 @@ fn build_client() -> Swarm { Swarm::new( transport, Client { - ping: Ping::new(PingConfig::new()), relay: behaviour, dcutr: dcutr::behaviour::Behaviour::new(), }, @@ -155,43 +150,16 @@ where transport } -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "RelayEvent", event_process = false)] -struct Relay { - relay: relay::Relay, - ping: Ping, -} - -#[derive(Debug)] -enum RelayEvent { - Relay(relay::Event), - Ping(PingEvent), -} - -impl From for RelayEvent { - fn from(event: relay::Event) -> Self { - RelayEvent::Relay(event) - } -} - -impl From for RelayEvent { - fn from(event: PingEvent) -> Self { - RelayEvent::Ping(event) - } -} - #[derive(NetworkBehaviour)] #[behaviour(out_event = "ClientEvent", event_process = false)] struct Client { relay: client::Client, - ping: Ping, dcutr: dcutr::behaviour::Behaviour, } #[derive(Debug)] enum ClientEvent { Relay(client::Event), - Ping(PingEvent), Dcutr(dcutr::behaviour::Event), } @@ -201,12 +169,6 @@ impl From for ClientEvent { } } -impl From for ClientEvent { - fn from(event: PingEvent) -> Self { - ClientEvent::Ping(event) - } -} - impl From for ClientEvent { fn from(event: dcutr::behaviour::Event) -> Self { ClientEvent::Dcutr(event) @@ -233,7 +195,6 @@ async fn wait_for_reservation( renewal, .. })) if relay_peer_id == peer_id && renewal == is_renewal => break, - SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} e => panic!("{:?}", e), @@ -257,7 +218,6 @@ async fn wait_for_connection_established(client: &mut Swarm, addr: &Mult break } SwarmEvent::Dialing(_) => {} - SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} SwarmEvent::Behaviour(ClientEvent::Relay( client::Event::OutboundCircuitEstablished { .. }, )) => {} @@ -278,7 +238,6 @@ async fn wait_for_dcutr_event(client: &mut Swarm) -> dcutr::behaviour::E loop { match client.select_next_some().await { SwarmEvent::Behaviour(ClientEvent::Dcutr(e)) => return e, - SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} e => panic!("{:?}", e), } } From e7fa5e9c5ee2354a27218ef830ec9e7e92cd9a15 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 1 Feb 2022 20:32:17 +0100 Subject: [PATCH 08/22] protocols/dcutr: Fix expect message typo --- protocols/dcutr/src/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 669a9fec168..419336aaef1 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -149,7 +149,7 @@ impl NetworkBehaviour for Behaviour { role: handler::Role::Initiator { attempt }, } => { let peer_id = - peer_id.expect("Prototype::DirectConnection is always for known peer."); + peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); if attempt < 3 { self.queued_actions.push_back(Action::Connect { peer_id, From 831deb5625a58d3b2b65e7873d6bd0d58561aa4c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 1 Feb 2022 20:34:13 +0100 Subject: [PATCH 09/22] protocols/dcutr: Remove empty Behaviour::inject_connected --- protocols/dcutr/src/behaviour.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 419336aaef1..12258c3603e 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -92,8 +92,6 @@ impl NetworkBehaviour for Behaviour { vec![] } - fn inject_connected(&mut self, _peer_id: &PeerId) {} - fn inject_connection_established( &mut self, peer_id: &PeerId, From c1deddb66c1766d1f1e7503e6d0bd57c6e4a29ec Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 1 Feb 2022 20:35:55 +0100 Subject: [PATCH 10/22] protocols/dcutr: Rename to InitiatedDirectConnectionUpgrade --- misc/metrics/src/dcutr.rs | 2 +- protocols/dcutr/src/behaviour.rs | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index fd18f3b3c22..912e59d0fec 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -58,7 +58,7 @@ enum EventType { impl From<&libp2p_dcutr::behaviour::Event> for EventType { fn from(event: &libp2p_dcutr::behaviour::Event) -> Self { match event { - libp2p_dcutr::behaviour::Event::InitiateDirectConnectionUpgrade { + libp2p_dcutr::behaviour::Event::InitiatedDirectConnectionUpgrade { remote_peer_id: _, local_relayed_addr: _, } => EventType::InitiateDirectConnectionUpgrade, diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 12258c3603e..fa9b31b5612 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -38,7 +38,7 @@ use thiserror::Error; /// The events produced by the [`Behaviour`]. #[derive(Debug)] pub enum Event { - InitiateDirectConnectionUpgrade { + InitiatedDirectConnectionUpgrade { remote_peer_id: PeerId, local_relayed_addr: Multiaddr, }, @@ -120,10 +120,12 @@ impl NetworkBehaviour for Behaviour { ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), }; self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::InitiateDirectConnectionUpgrade { - remote_peer_id: *peer_id, - local_relayed_addr: local_addr.clone(), - }) + NetworkBehaviourAction::GenerateEvent( + Event::InitiatedDirectConnectionUpgrade { + remote_peer_id: *peer_id, + local_relayed_addr: local_addr.clone(), + }, + ) .into(), ); } From 128d8015d6e6dc68c824292d661f1ee62029fb49 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 1 Feb 2022 20:44:29 +0100 Subject: [PATCH 11/22] protocols/dcutr/examples: Use async instead of poll_fn --- protocols/dcutr/examples/client.rs | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/protocols/dcutr/examples/client.rs b/protocols/dcutr/examples/client.rs index 4da42198ec9..7654e57701e 100644 --- a/protocols/dcutr/examples/client.rs +++ b/protocols/dcutr/examples/client.rs @@ -39,7 +39,6 @@ use std::convert::TryInto; use std::error::Error; use std::net::Ipv4Addr; use std::str::FromStr; -use std::task::{Context, Poll}; use structopt::StructOpt; #[derive(Debug, StructOpt)] @@ -230,39 +229,34 @@ fn main() -> Result<(), Box> { .unwrap(); } - block_on(futures::future::poll_fn(move |cx: &mut Context<'_>| { + block_on(async { loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => { + match swarm.next().await.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { info!("Listening on {:?}", address); } - Poll::Ready(Some(SwarmEvent::Behaviour(Event::Relay(event)))) => { + SwarmEvent::Behaviour(Event::Relay(event)) => { info!("{:?}", event) } - Poll::Ready(Some(SwarmEvent::Behaviour(Event::Dcutr(event)))) => { + SwarmEvent::Behaviour(Event::Dcutr(event)) => { info!("{:?}", event) } - Poll::Ready(Some(SwarmEvent::Behaviour(Event::Identify(event)))) => { + SwarmEvent::Behaviour(Event::Identify(event)) => { info!("{:?}", event) } - Poll::Ready(Some(SwarmEvent::Behaviour(Event::Ping(_)))) => {} - Poll::Ready(Some(SwarmEvent::ConnectionEstablished { + SwarmEvent::Behaviour(Event::Ping(_)) => {} + SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. - })) => { + } => { info!("Established connection to {:?} via {:?}", peer_id, endpoint); } - Poll::Ready(Some(SwarmEvent::OutgoingConnectionError { peer_id, error })) => { + SwarmEvent::OutgoingConnectionError { peer_id, error } => { info!("Outgoing connection error to {:?}: {:?}", peer_id, error); } - Poll::Ready(Some(_)) => {} - Poll::Ready(None) => return Poll::Ready(Ok(())), - Poll::Pending => { - break; - } + _ => {} } } - Poll::Pending - })) + }) } fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair { From 10046f21c644118a46dac74c25a3b1c927d532ae Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 1 Feb 2022 20:46:20 +0100 Subject: [PATCH 12/22] protocols/dcutr/examples: Replace matches! with == --- protocols/dcutr/examples/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/dcutr/examples/client.rs b/protocols/dcutr/examples/client.rs index 7654e57701e..4e9da81f55f 100644 --- a/protocols/dcutr/examples/client.rs +++ b/protocols/dcutr/examples/client.rs @@ -61,7 +61,7 @@ struct Opts { remote_peer_id: Option, } -#[derive(Debug, StructOpt)] +#[derive(Debug, StructOpt, PartialEq)] enum Mode { Dial, Listen, @@ -218,7 +218,7 @@ fn main() -> Result<(), Box> { } }); - if matches!(opts.mode, Mode::Dial) { + if opts.mode == Mode::Dial { swarm .dial( opts.relay_address From 21ae03bbe9a30bc4ce567511f81343d0af54043c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 1 Feb 2022 20:51:51 +0100 Subject: [PATCH 13/22] protocols/dcutr/examples: Document delay hack --- protocols/dcutr/examples/client.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/protocols/dcutr/examples/client.rs b/protocols/dcutr/examples/client.rs index 4e9da81f55f..af848817445 100644 --- a/protocols/dcutr/examples/client.rs +++ b/protocols/dcutr/examples/client.rs @@ -165,23 +165,24 @@ fn main() -> Result<(), Box> { ) .unwrap(); - // Wait to listen on localhost. + // Wait to listen on all interfaces. block_on(async { let mut delay = futures_timer::Delay::new(std::time::Duration::from_secs(1)).fuse(); loop { futures::select! { - event = swarm.next() => { - match event.unwrap() { - SwarmEvent::NewListenAddr { address, .. } => { - info!("Listening on {:?}", address); - } - event => panic!("{:?}", event), - } - } - _ = delay => { - break; - } + event = swarm.next() => { + match event.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { + info!("Listening on {:?}", address); } + event => panic!("{:?}", event), + } + } + _ = delay => { + // Likely listening on all interfaces now, thus continuing by breaking the loop. + break; + } + } } }); From da5e48f10d7eee80a97d7d2f8c349b30696eec11 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 1 Feb 2022 20:53:51 +0100 Subject: [PATCH 14/22] protocols/dcutr/examples: Remove outdated comment on relay mode --- protocols/dcutr/examples/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/dcutr/examples/client.rs b/protocols/dcutr/examples/client.rs index af848817445..64453f62c44 100644 --- a/protocols/dcutr/examples/client.rs +++ b/protocols/dcutr/examples/client.rs @@ -44,11 +44,11 @@ use structopt::StructOpt; #[derive(Debug, StructOpt)] #[structopt(name = "libp2p DCUtR client")] struct Opts { - /// The mode (relay, client-listen, client-dial) + /// The mode (client-listen, client-dial). #[structopt(long)] mode: Mode, - /// Fixed value to generate deterministic peer id + /// Fixed value to generate deterministic peer id. #[structopt(long)] secret_key_seed: u8, From 3b75add8221aa3f3e02c7c8e5f99101bf636f8e1 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 2 Feb 2022 11:38:09 +0100 Subject: [PATCH 15/22] protocols/dcutr/src/behaviour: Consolidate obs_addrs collection --- protocols/dcutr/src/behaviour.rs | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index fa9b31b5612..af53084defa 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -356,6 +356,17 @@ impl Action { self, poll_parameters: &mut impl PollParameters, ) -> NetworkBehaviourAction { + let obs_addrs = || { + poll_parameters + .external_addresses() + .filter(|a| !a.addr.iter().any(|p| p == Protocol::P2pCircuit)) + .map(|a| { + a.addr + .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) + }) + .collect() + }; + match self { Action::Done(action) => action, Action::AcceptInboundConnect { @@ -367,14 +378,7 @@ impl Action { peer_id, event: Either::Left(handler::relayed::Command::AcceptInboundConnect { inbound_connect, - obs_addrs: poll_parameters - .external_addresses() - .filter(|a| !a.addr.iter().any(|p| p == Protocol::P2pCircuit)) - .map(|a| { - a.addr - .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) - }) - .collect(), + obs_addrs: obs_addrs(), }), }, Action::Connect { @@ -386,14 +390,7 @@ impl Action { peer_id, event: Either::Left(handler::relayed::Command::Connect { attempt, - obs_addrs: poll_parameters - .external_addresses() - .filter(|a| !a.addr.iter().any(|p| p == Protocol::P2pCircuit)) - .map(|a| { - a.addr - .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) - }) - .collect(), + obs_addrs: obs_addrs(), }), }, } From 9d998502cfb5d6eb8e789a028caf2b30a52117ad Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 2 Feb 2022 16:18:35 +0100 Subject: [PATCH 16/22] protocols/dcutr: Document reversed connection and substream roles --- protocols/dcutr/src/handler/relayed.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index bafbf880752..047f540b693 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -180,6 +180,11 @@ impl ProtocolsHandler for Handler { SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ()) } ConnectedPoint::Listener { .. } => { + // By the protocol specification the listening side of a relayed connection + // initiates the _direct connection upgrade_. In other words the listening side of + // the relayed connection opens a substream to the dialing side. (Connection roles + // and substream roles are reversed.) The listening side on a relayed connection + // never expects incoming substreams, hence the denied upgrade below. SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ()) } } @@ -192,7 +197,7 @@ impl ProtocolsHandler for Handler { ) { match output { EitherOutput::First(inbound_connect) => { - let remote_addr = match & self.endpoint { + let remote_addr = match &self.endpoint { ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), ConnectedPoint::Listener { ..} => unreachable!("`::listen_protocol` denies all incoming substreams as a listener."), }; From 0194655f661f02c5afcef6234cbdd2af28c7045c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 2 Feb 2022 16:28:29 +0100 Subject: [PATCH 17/22] protocols/dcutr/src/handler: Don't KeepAlive::No on all errors --- protocols/dcutr/src/handler/relayed.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 047f540b693..b44af7cd59c 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -261,8 +261,6 @@ impl ProtocolsHandler for Handler { _: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>, ) { - self.keep_alive = KeepAlive::No; - match error { ProtocolsHandlerUpgrErr::Timeout => { self.queued_events.push_back(ProtocolsHandlerEvent::Custom( @@ -282,6 +280,7 @@ impl ProtocolsHandler for Handler { // The remote merely doesn't support the DCUtR protocol. // This is no reason to close the connection, which may // successfully communicate with other protocols already. + self.keep_alive = KeepAlive::No; self.queued_events.push_back(ProtocolsHandlerEvent::Custom( Event::InboundNegotiationFailed { error: ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( From ce93ec2e6677798803593232c1895d3f836ad1dc Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 2 Feb 2022 16:39:58 +0100 Subject: [PATCH 18/22] protocols/dcutr: Extract MAX_NUMBER_OF_UPGRADE_ATTEMPTS constant --- protocols/dcutr/src/behaviour.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index af53084defa..3e4f742505d 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -35,6 +35,8 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; use thiserror::Error; +const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; + /// The events produced by the [`Behaviour`]. #[derive(Debug)] pub enum Event { @@ -150,7 +152,7 @@ impl NetworkBehaviour for Behaviour { } => { let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); - if attempt < 3 { + if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { self.queued_actions.push_back(Action::Connect { peer_id, handler: NotifyHandler::One(relayed_connection_id), From 354900fd1eafa108d9f70f54ec5c15ae3527656b Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 2 Feb 2022 17:52:19 +0100 Subject: [PATCH 19/22] protocols/dcutr: Implement Encoder and Decoder for DCUtR prost messages --- protocols/dcutr/src/protocol.rs | 3 +- protocols/dcutr/src/protocol/codec.rs | 88 ++++++++++++++++++++++++ protocols/dcutr/src/protocol/inbound.rs | 57 ++++++--------- protocols/dcutr/src/protocol/outbound.rs | 48 ++++--------- 4 files changed, 126 insertions(+), 70 deletions(-) create mode 100644 protocols/dcutr/src/protocol/codec.rs diff --git a/protocols/dcutr/src/protocol.rs b/protocols/dcutr/src/protocol.rs index d2b8b39a6d0..a69a27f79cf 100644 --- a/protocols/dcutr/src/protocol.rs +++ b/protocols/dcutr/src/protocol.rs @@ -18,9 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +mod codec; pub mod inbound; pub mod outbound; const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr"; - -const MAX_MESSAGE_SIZE_BYTES: usize = 4096; diff --git a/protocols/dcutr/src/protocol/codec.rs b/protocols/dcutr/src/protocol/codec.rs new file mode 100644 index 00000000000..9706d756e9e --- /dev/null +++ b/protocols/dcutr/src/protocol/codec.rs @@ -0,0 +1,88 @@ +// Copyright 2022 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::message_proto; +use bytes::BytesMut; +use prost::Message; +use std::io::Cursor; +use thiserror::Error; +use unsigned_varint::codec::UviBytes; + +const MAX_MESSAGE_SIZE_BYTES: usize = 4096; + +pub struct Codec(UviBytes); + +impl Codec { + pub fn new() -> Self { + let mut codec = UviBytes::default(); + codec.set_max_len(MAX_MESSAGE_SIZE_BYTES); + Self(codec) + } +} + +impl asynchronous_codec::Encoder for Codec { + type Item = message_proto::HolePunch; + type Error = Error; + + fn encode( + &mut self, + item: Self::Item, + dst: &mut asynchronous_codec::BytesMut, + ) -> Result<(), Self::Error> { + let mut encoded_msg = BytesMut::new(); + item.encode(&mut encoded_msg) + .expect("BytesMut to have sufficient capacity."); + self.0 + .encode(encoded_msg.freeze(), dst) + .map_err(|e| e.into()) + } +} + +impl asynchronous_codec::Decoder for Codec { + type Item = message_proto::HolePunch; + type Error = Error; + + fn decode( + &mut self, + src: &mut asynchronous_codec::BytesMut, + ) -> Result, Self::Error> { + Ok(self + .0 + .decode(src)? + .map(|msg| message_proto::HolePunch::decode(Cursor::new(msg))) + .transpose()?) + } +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Failed to decode response: {0}.")] + Decode( + #[from] + #[source] + prost::DecodeError, + ), + #[error("Io error {0}")] + Io( + #[from] + #[source] + std::io::Error, + ), +} diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index 5e7bfcc0261..c05e1072b8f 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -20,16 +20,12 @@ use crate::message_proto::{hole_punch, HolePunch}; use asynchronous_codec::Framed; -use bytes::BytesMut; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; use std::convert::TryFrom; -use std::io::Cursor; use std::iter; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub struct Upgrade {} @@ -48,17 +44,17 @@ impl upgrade::InboundUpgrade for Upgrade { type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { - let mut codec = UviBytes::default(); - codec.set_max_len(super::MAX_MESSAGE_SIZE_BYTES); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, super::codec::Codec::new()); async move { - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - - let HolePunch { r#type, obs_addrs } = HolePunch::decode(Cursor::new(msg))?; + let HolePunch { r#type, obs_addrs } = + substream + .next() + .await + .ok_or(super::codec::Error::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "", + )))??; let obs_addrs = if obs_addrs.is_empty() { return Err(UpgradeError::NoAddresses); @@ -92,7 +88,7 @@ impl upgrade::InboundUpgrade for Upgrade { } pub struct PendingConnect { - substream: Framed, + substream: Framed, remote_obs_addrs: Vec, } @@ -106,18 +102,15 @@ impl PendingConnect { obs_addrs: local_obs_addrs.into_iter().map(|a| a.to_vec()).collect(), }; - let mut encoded_msg = BytesMut::new(); - msg.encode(&mut encoded_msg) - .expect("BytesMut to have sufficient capacity."); - - self.substream.send(encoded_msg.freeze()).await?; - let msg: bytes::BytesMut = self - .substream - .next() - .await - .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - - let HolePunch { r#type, .. } = HolePunch::decode(Cursor::new(msg))?; + self.substream.send(msg).await?; + let HolePunch { r#type, .. } = + self.substream + .next() + .await + .ok_or(super::codec::Error::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "", + )))??; let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; match r#type { @@ -131,17 +124,11 @@ impl PendingConnect { #[derive(Debug, Error)] pub enum UpgradeError { - #[error("Failed to decode response: {0}.")] - Decode( - #[from] - #[source] - prost::DecodeError, - ), - #[error("Io error {0}")] - Io( + #[error("Failed to encode or decode: {0}")] + Codec( #[from] #[source] - std::io::Error, + super::codec::Error, ), #[error("Expected at least one address in reservation.")] NoAddresses, diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index 7717c8e228b..332554c276a 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -20,18 +20,14 @@ use crate::message_proto::{hole_punch, HolePunch}; use asynchronous_codec::Framed; -use bytes::BytesMut; use futures::{future::BoxFuture, prelude::*}; use futures_timer::Delay; use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; use std::convert::TryFrom; -use std::io::Cursor; use std::iter; use std::time::Instant; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub struct Upgrade { obs_addrs: Vec, @@ -58,33 +54,29 @@ impl upgrade::OutboundUpgrade for Upgrade { type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + let mut substream = Framed::new(substream, super::codec::Codec::new()); + let msg = HolePunch { r#type: hole_punch::Type::Connect.into(), obs_addrs: self.obs_addrs.into_iter().map(|a| a.to_vec()).collect(), }; - let mut encoded_msg = BytesMut::new(); - msg.encode(&mut encoded_msg) - .expect("BytesMut to have sufficient capacity."); - - let mut codec = UviBytes::default(); - codec.set_max_len(super::MAX_MESSAGE_SIZE_BYTES); - let mut substream = Framed::new(substream, codec); - async move { - substream.send(encoded_msg.freeze()).await?; + substream.send(msg).await?; let sent_time = Instant::now(); - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; + let HolePunch { r#type, obs_addrs } = + substream + .next() + .await + .ok_or(super::codec::Error::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "", + )))??; let rtt = sent_time.elapsed(); - let HolePunch { r#type, obs_addrs } = HolePunch::decode(Cursor::new(msg))?; - let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; match r#type { hole_punch::Type::Connect => {} @@ -111,11 +103,7 @@ impl upgrade::OutboundUpgrade for Upgrade { obs_addrs: vec![], }; - let mut encoded_msg = BytesMut::new(); - msg.encode(&mut encoded_msg) - .expect("BytesMut to have sufficient capacity."); - - substream.send(encoded_msg.freeze()).await?; + substream.send(msg).await?; Delay::new(rtt / 2).await; @@ -131,17 +119,11 @@ pub struct Connect { #[derive(Debug, Error)] pub enum UpgradeError { - #[error("Failed to decode response: {0}.")] - Decode( - #[from] - #[source] - prost::DecodeError, - ), - #[error("Io error {0}")] - Io( + #[error("Failed to encode or decode: {0}")] + Codec( #[from] #[source] - std::io::Error, + super::codec::Error, ), #[error("Expected 'status' field to be set.")] MissingStatusField, From b50708b7fe237e59ae549499a6b992a664386b60 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 2 Feb 2022 20:02:28 +0100 Subject: [PATCH 20/22] protocols/dcutr: Use extend instead of double push_back --- protocols/dcutr/src/behaviour.rs | 50 ++++++++++++++------------------ 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 3e4f742505d..faa2698d62c 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -112,24 +112,23 @@ impl NetworkBehaviour for Behaviour { // connection upgrade by initiating a direct connection to A. // // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol - self.queued_actions.push_back(Action::Connect { - peer_id: *peer_id, - attempt: 1, - handler: NotifyHandler::One(*connection_id), - }); - let local_addr = match connected_point { - ConnectedPoint::Listener { local_addr, .. } => local_addr, - ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), - }; - self.queued_actions.push_back( + self.queued_actions.extend([ + Action::Connect { + peer_id: *peer_id, + attempt: 1, + handler: NotifyHandler::One(*connection_id), + }, NetworkBehaviourAction::GenerateEvent( Event::InitiatedDirectConnectionUpgrade { remote_peer_id: *peer_id, - local_relayed_addr: local_addr.clone(), + local_relayed_addr: match connected_point { + ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(), + ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), + }, }, ) .into(), - ); + ]); } } else { self.direct_connections @@ -159,7 +158,7 @@ impl NetworkBehaviour for Behaviour { attempt: attempt + 1, }); } else { - self.queued_actions.push_back( + self.queued_actions.extend([ NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(relayed_connection_id), @@ -168,8 +167,6 @@ impl NetworkBehaviour for Behaviour { ), } .into(), - ); - self.queued_actions.push_back( NetworkBehaviourAction::GenerateEvent( Event::DirectConnectionUpgradeFailed { remote_peer_id: peer_id, @@ -177,7 +174,7 @@ impl NetworkBehaviour for Behaviour { }, ) .into(), - ); + ]); } } _ => {} @@ -221,12 +218,12 @@ impl NetworkBehaviour for Behaviour { inbound_connect, remote_addr, }) => { - self.queued_actions.push_back(Action::AcceptInboundConnect { - peer_id: event_source, - handler: NotifyHandler::One(connection), - inbound_connect, - }); - self.queued_actions.push_back( + self.queued_actions.extend([ + Action::AcceptInboundConnect { + peer_id: event_source, + handler: NotifyHandler::One(connection), + inbound_connect, + }, NetworkBehaviourAction::GenerateEvent( Event::RemoteInitiatedDirectConnectionUpgrade { remote_peer_id: event_source, @@ -234,7 +231,7 @@ impl NetworkBehaviour for Behaviour { }, ) .into(), - ); + ]); } Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { self.queued_actions.push_back( @@ -293,7 +290,7 @@ impl NetworkBehaviour for Behaviour { relayed_connection_id, }, )) => { - self.queued_actions.push_back( + self.queued_actions.extend([ NetworkBehaviourAction::NotifyHandler { peer_id: event_source, handler: NotifyHandler::One(relayed_connection_id), @@ -302,16 +299,13 @@ impl NetworkBehaviour for Behaviour { ), } .into(), - ); - self.queued_actions.push_back( NetworkBehaviourAction::GenerateEvent( Event::DirectConnectionUpgradeSucceeded { remote_peer_id: event_source, }, ) .into(), - ); - return; + ]); } Either::Right(Either::Right(event)) => void::unreachable(event), }; From f18a8638e310c1ba87ddd3ce1e08baf2046fb59a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 2 Feb 2022 20:33:27 +0100 Subject: [PATCH 21/22] protocols/dcutr: Rename Action to ActionBuilder --- protocols/dcutr/src/behaviour.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index faa2698d62c..fe8bbe42b2f 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -67,7 +67,7 @@ pub enum UpgradeError { pub struct Behaviour { /// Queue of actions to return when polled. - queued_actions: VecDeque, + queued_actions: VecDeque, /// All direct (non-relayed) connections. direct_connections: HashMap>, @@ -113,7 +113,7 @@ impl NetworkBehaviour for Behaviour { // // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol self.queued_actions.extend([ - Action::Connect { + ActionBuilder::Connect { peer_id: *peer_id, attempt: 1, handler: NotifyHandler::One(*connection_id), @@ -152,7 +152,7 @@ impl NetworkBehaviour for Behaviour { let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { - self.queued_actions.push_back(Action::Connect { + self.queued_actions.push_back(ActionBuilder::Connect { peer_id, handler: NotifyHandler::One(relayed_connection_id), attempt: attempt + 1, @@ -219,7 +219,7 @@ impl NetworkBehaviour for Behaviour { remote_addr, }) => { self.queued_actions.extend([ - Action::AcceptInboundConnect { + ActionBuilder::AcceptInboundConnect { peer_id: event_source, handler: NotifyHandler::One(connection), inbound_connect, @@ -327,7 +327,7 @@ impl NetworkBehaviour for Behaviour { /// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] /// before being returned in [`Behaviour::poll`]. #[allow(clippy::large_enum_variant)] -enum Action { +enum ActionBuilder { Done(NetworkBehaviourAction), Connect { attempt: u8, @@ -341,13 +341,13 @@ enum Action { }, } -impl From> for Action { +impl From> for ActionBuilder { fn from(action: NetworkBehaviourAction) -> Self { Self::Done(action) } } -impl Action { +impl ActionBuilder { fn build( self, poll_parameters: &mut impl PollParameters, @@ -364,8 +364,8 @@ impl Action { }; match self { - Action::Done(action) => action, - Action::AcceptInboundConnect { + ActionBuilder::Done(action) => action, + ActionBuilder::AcceptInboundConnect { inbound_connect, handler, peer_id, @@ -377,7 +377,7 @@ impl Action { obs_addrs: obs_addrs(), }), }, - Action::Connect { + ActionBuilder::Connect { attempt, handler, peer_id, From 7934db575de3cf6bd236172e91a4e4d7bf4f5740 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 8 Feb 2022 15:33:12 +0100 Subject: [PATCH 22/22] protocols/dcutr/tests: Fix flaky test --- protocols/dcutr/tests/lib.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 64f41e5a831..a53a5319f34 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -187,25 +187,32 @@ async fn wait_for_reservation( relay_peer_id: PeerId, is_renewal: bool, ) { + let mut new_listen_addr_for_relayed_addr = false; + let mut reservation_req_accepted = false; loop { match client.select_next_some().await { SwarmEvent::NewListenAddr { address, .. } if address != client_addr => {} + SwarmEvent::NewListenAddr { address, .. } if address == client_addr => { + new_listen_addr_for_relayed_addr = true; + if reservation_req_accepted { + break; + } + } SwarmEvent::Behaviour(ClientEvent::Relay(client::Event::ReservationReqAccepted { relay_peer_id: peer_id, renewal, .. - })) if relay_peer_id == peer_id && renewal == is_renewal => break, + })) if relay_peer_id == peer_id && renewal == is_renewal => { + reservation_req_accepted = true; + if new_listen_addr_for_relayed_addr { + break; + } + } SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} e => panic!("{:?}", e), } } - - // Wait for `NewListenAddr` event. - match client.select_next_some().await { - SwarmEvent::NewListenAddr { address, .. } if address == client_addr => {} - e => panic!("{:?}", e), - } } async fn wait_for_connection_established(client: &mut Swarm, addr: &Multiaddr) {