diff --git a/Cargo.toml b/Cargo.toml index ed727247bd2..0ca47ec615e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ default = [ "ping", "plaintext", "pnet", + "quic", "secio", "secp256k1", "tcp-async-std", @@ -43,6 +44,7 @@ noise = ["libp2p-noise"] ping = ["libp2p-ping"] plaintext = ["libp2p-plaintext"] pnet = ["libp2p-pnet"] +quic = ["libp2p-quic"] secio = ["libp2p-secio"] tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"] tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] @@ -84,6 +86,7 @@ libp2p-deflate = { version = "0.19.2", path = "protocols/deflate", optional = tr libp2p-dns = { version = "0.19.0", path = "transports/dns", optional = true } libp2p-mdns = { version = "0.19.2", path = "protocols/mdns", optional = true } libp2p-tcp = { version = "0.19.2", path = "transports/tcp", optional = true } +libp2p-quic = { version = "0.19.0", path = "transports/quic", optional = true } libp2p-websocket = { version = "0.20.0", path = "transports/websocket", optional = true } [dev-dependencies] @@ -110,8 +113,16 @@ members = [ "protocols/secio", "swarm", "transports/dns", + "transports/quic", "transports/tcp", "transports/uds", + "transports/wasm-ext", "transports/websocket", - "transports/wasm-ext" ] + +[patch.crates-io] +# TODO: waiting for a release +rustls = { git = "https://github.com/ctz/rustls", rev = "cac66a8c184f3ef8510bb62b390f241c2760f51d" } +# TODO: overwrite necessary because quinn-proto 0.6.1 is incompatible with the rustls overwrite above +# TODO: UUUUUUUGHHHHHH +quinn-proto = { git = "https://github.com/djc/quinn", rev = "6e9f7e8063926f8bdab56bbc1615277ff0bdece5" } diff --git a/LICENSE b/LICENSE index d4bb412c336..da84696303f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2017 Parity Technologies (UK) Ltd. +Copyright 2017-2020 Parity Technologies (UK) Ltd. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/core/src/lib.rs b/core/src/lib.rs index a4e66486b41..e0631b8933f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -23,14 +23,14 @@ //! The main concepts of libp2p-core are: //! //! - A [`PeerId`] is a unique global identifier for a node on the network. -//! Each node must have a different `PeerId`. Normally, a `PeerId` is the +//! Each node must have a different [`PeerId`]. Normally, a [`PeerId`] is the //! hash of the public key used to negotiate encryption on the //! communication channel, thereby guaranteeing that they cannot be spoofed. //! - The [`Transport`] trait defines how to reach a remote node or listen for -//! incoming remote connections. See the `transport` module. +//! incoming remote connections. See the [`transport`] module. //! - The [`StreamMuxer`] trait is implemented on structs that hold a connection //! to a remote and can subdivide this connection into multiple substreams. -//! See the `muxing` module. +//! See the [`muxing`] module. //! - The [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] traits //! define how to upgrade each individual substream to use a protocol. //! See the `upgrade` module. diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 1a83d585f3b..03445fd5928 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -90,6 +90,10 @@ pub trait StreamMuxer { /// Only the latest task that was used to call this method may be notified. /// /// An error can be generated if the connection has been closed. + /// + /// Polling for an inbound substream is guaranteed to be performed continuously for as long + /// as the [`StreamMuxer`] as a whole is alive, and can therefore be used to drive some + /// connection-wide background processing such as sending connection-wide PINGs/PONGs. fn poll_inbound(&self, cx: &mut Context) -> Poll>; /// Opens a new outgoing substream, and produces the equivalent to a future that will be diff --git a/misc/multiaddr/Cargo.toml b/misc/multiaddr/Cargo.toml index 620dc637f59..263d7ed5e8a 100644 --- a/misc/multiaddr/Cargo.toml +++ b/misc/multiaddr/Cargo.toml @@ -20,6 +20,10 @@ static_assertions = "1.1" unsigned-varint = "0.4" url = { version = "2.1.0", default-features = false } +[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] +get_if_addrs = "0.5.3" +ipnet = "2.1.0" + [dev-dependencies] bincode = "1" quickcheck = "0.9.0" diff --git a/misc/multiaddr/src/lib.rs b/misc/multiaddr/src/lib.rs index f81b8ada763..5dafe7e486e 100644 --- a/misc/multiaddr/src/lib.rs +++ b/misc/multiaddr/src/lib.rs @@ -303,6 +303,37 @@ impl TryFrom> for Multiaddr { } } +/// Collect all local host addresses and use the provided port number as listen port. +#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] +pub fn host_addresses(suffix: &[Protocol]) -> io::Result> { + use get_if_addrs::{get_if_addrs, IfAddr}; + use ipnet::{IpNet, Ipv4Net, Ipv6Net}; + let mut addrs = Vec::new(); + for iface in get_if_addrs()? { + let ip = iface.ip(); + let mut ma = Multiaddr::from(ip); + for proto in suffix { + ma = ma.with(proto.clone()) + } + let ipn = match iface.addr { + IfAddr::V4(ip4) => { + let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); + let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) + .expect("prefix_len is the number of bits in a u32, so can not exceed 32"); + IpNet::V4(ipnet) + } + IfAddr::V6(ip6) => { + let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); + let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) + .expect("prefix_len is the number of bits in a u128, so can not exceed 128"); + IpNet::V6(ipnet) + } + }; + addrs.push((ip, ipn, ma)) + } + Ok(addrs) +} + impl TryFrom for Multiaddr { type Error = Error; diff --git a/src/lib.rs b/src/lib.rs index 1dab7142765..6f40f6b54be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -211,6 +211,9 @@ pub use libp2p_ping as ping; #[cfg_attr(docsrs, doc(cfg(feature = "plaintext")))] #[doc(inline)] pub use libp2p_plaintext as plaintext; +#[cfg(all(feature = "quic", not(any(target_os = "emscripten", target_os = "unknown"))))] +#[doc(inline)] +pub use libp2p_quic as quic; #[cfg(feature = "secio")] #[cfg_attr(docsrs, doc(cfg(feature = "secio")))] #[doc(inline)] diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml new file mode 100644 index 00000000000..05629e4c09d --- /dev/null +++ b/transports/quic/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "libp2p-quic" +version = "0.19.2" +authors = ["Parity Technologies "] +edition = "2018" +license = "MIT" +description = "A libp2p transport using QUIC" +keywords = ["peer-to-peer", "libp2p", "quic", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +async-std = "^1.5.0" +either = "1.5.3" +env_logger = "0.7.1" +futures = "0.3.4" +futures-timer = "3.0.2" +ipnet = "2.2.0" +libp2p-core = { path = "../../core", version = "0.19.0" } +log = "0.4.0" +parking_lot = "0.10.0" +quinn-proto = "0.6.1" +rcgen = { version = "0.8.1", default-features = false } +ring = "0.16.11" +rustls = { version = "0.17.0", features = ["dangerous_configuration"] } +thiserror = "1.0.15" +untrusted = "0.7.0" +webpki = "0.21.2" +yasna = "0.3.1" +tracing = "0.1.15" + +# TODO: RGMLRMLG fix that crate name +[dependencies.x509-signature] +version = "0.4.0" +features = ["webpki", "rustls", "std"] + +[dev-dependencies] +tracing = "0.1.15" +tracing-core = "0.1.10" +tracing-subscriber = "0.2.6" diff --git a/transports/quic/src/connection.rs b/transports/quic/src/connection.rs new file mode 100644 index 00000000000..ed3d4911526 --- /dev/null +++ b/transports/quic/src/connection.rs @@ -0,0 +1,414 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A single QUIC connection. +//! +//! The [`Connection`] struct of this module contains, amongst other things, a +//! [`quinn_proto::Connection`] state machine and an `Arc`. This struct is responsible +//! for communication between quinn_proto's connection and its associated endpoint. +//! All interactions with a QUIC connection should be done through this struct. +// TODO: docs + +use crate::endpoint::Endpoint; + +use futures::{channel::mpsc, prelude::*}; +use std::{ + fmt, + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Instant, +}; + +/// Underlying structure for both [`crate::QuicMuxer`] and [`crate::Upgrade`]. +/// +/// Contains everything needed to process a connection with a remote. +/// Tied to a specific [`crate::Endpoint`]. +pub(crate) struct Connection { + /// Endpoint this connection belongs to. + endpoint: Arc, + /// Future whose job is to send a message to the endpoint. Only one at a time. + pending_to_endpoint: Option + Send + Sync>>>, + /// Events that the endpoint will send in destination to our local [`quinn_proto::Connection`]. + /// Passed at initialization. + from_endpoint: mpsc::Receiver, + + /// The QUIC state machine for this specific connection. + connection: quinn_proto::Connection, + /// Identifier for this connection according to the endpoint. Used when sending messages to + /// the endpoint. + connection_id: quinn_proto::ConnectionHandle, + /// `Future` that triggers at the `Instant` that `self.connection.poll_timeout()` indicates. + next_timeout: Option, + + /// In other to avoid race conditions where a "connected" event happens if we were not + /// handshaking, we cache whether the connection is handshaking and only set this to true + /// after a "connected" event has been received. + /// + /// In other words, this flag indicates whether a "connected" hasn't been received yet. + is_handshaking: bool, + /// Contains a `Some` if the connection is closed, with the reason of the closure. + /// Contains `None` if it is still open. + /// Contains `Some` if and only if a `ConnectionLost` event has been emitted. + closed: Option, +} + +/// Error on the connection as a whole. +#[derive(Debug, Clone, thiserror::Error)] +pub enum Error { + /// Endpoint has force-killed this connection because it was too busy. + #[error("Endpoint has force-killed our connection")] + ClosedChannel, + /// Error in the inner state machine. + #[error("{0}")] + Quinn(#[from] quinn_proto::ConnectionError), +} + +impl Connection { + /// Crate-internal function that builds a [`Connection`] from raw components. + /// + /// This function assumes that there exists a background task that will process the messages + /// sent to `to_endpoint` and send us messages on `from_endpoint`. + /// + /// The `from_endpoint` can be purposefully closed by the endpoint if the connection is too + /// slow to process. + // TODO: is this necessary ^? figure out if quinn_proto doesn't forbid that situation in the first place + /// + /// `connection_id` is used to identify the local connection in the messages sent to + /// `to_endpoint`. + /// + /// This function assumes that the [`quinn_proto::Connection`] is completely fresh and none of + /// its methods has ever been called. Failure to comply might lead to logic errors and panics. + // TODO: maybe abstract `to_endpoint` more and make it generic? dunno + pub(crate) fn from_quinn_connection( + endpoint: Arc, + connection: quinn_proto::Connection, + connection_id: quinn_proto::ConnectionHandle, + from_endpoint: mpsc::Receiver, + ) -> Self { + // As the documentation mention, one is not supposed to call any of the methods on the + // `quinn_proto::Connection` before entering this function, and consequently, even if the + // connection has already been closed, there is no way for it to know that it has been + // closed. + assert!(!connection.is_closed()); + + let is_handshaking = connection.is_handshaking(); + + Connection { + endpoint, + pending_to_endpoint: None, + connection, + next_timeout: None, + from_endpoint, + connection_id, + is_handshaking, + closed: None, + } + } + + /// Returns the connection’s side (client or server) + pub(crate) fn side(&self) -> quinn_proto::Side { + self.connection.side() + } + + /// Returns the certificates sent by the remote through the underlying TLS session. + /// Returns `None` if the connection is still handshaking. + // TODO: it seems to happen that is_handshaking is false but this returns None + pub(crate) fn peer_certificates( + &self, + ) -> Option> { + self.connection + .crypto_session() + .get_peer_certificates() + .map(|l| l.into_iter().map(|l| l.into())) + } + + /// Returns the address of the node we're connected to. + // TODO: can change /!\ + pub(crate) fn remote_addr(&self) -> SocketAddr { + self.connection.remote_address() + } + + /// Returns `true` if this connection is still pending. Returns `false` if we are connected to + /// the remote or if the connection is closed. + pub(crate) fn is_handshaking(&self) -> bool { + self.is_handshaking + } + + /// If the connection is closed, returns why. If the connection is open, returns `None`. + /// + /// > **Note**: This method is also the main way to determine whether a connection is closed. + pub(crate) fn close_reason(&self) -> Option<&Error> { + assert!(!self.is_handshaking); + self.closed.as_ref() + } + + /// Start closing the connection. A [`ConnectionEvent::ConnectionLost`] event will be + /// produced in the future. + pub(crate) fn close(&mut self) { + // TODO: what if the user calls this multiple times? + // We send a dummy `0` error code with no message, as the API of StreamMuxer doesn't + // support this. + self.connection + .close(Instant::now(), From::from(0u32), Default::default()); + } + + /// Pops a new substream opened by the remote. + /// + /// If `None` is returned, then a [`ConnectionEvent::StreamAvailable`] event will later be + /// produced when a substream is available. + pub(crate) fn pop_incoming_substream(&mut self) -> Option { + self.connection.accept(quinn_proto::Dir::Bi) + } + + /// Pops a new substream opened locally. + /// + /// The API can be thought as if outgoing substreams were automatically opened by the local + /// QUIC connection and were added to a queue for availability. + /// + /// If `None` is returned, then a [`ConnectionEvent::StreamOpened`] event will later be + /// produced when a substream is available. + pub(crate) fn pop_outgoing_substream(&mut self) -> Option { + self.connection.open(quinn_proto::Dir::Bi) + } + + // TODO: better API + pub(crate) fn read_substream( + &mut self, + id: quinn_proto::StreamId, + buf: &mut [u8], + ) -> Result { + self.connection.read(id, buf).map(|n| n.unwrap_or(0)) + } + + pub(crate) fn write_substream( + &mut self, + id: quinn_proto::StreamId, + buf: &[u8], + ) -> Result { + self.connection.write(id, buf) + } + + pub(crate) fn is_drained(&self) -> bool { + self.connection.is_drained() + } + + pub(crate) fn shutdown_substream( + &mut self, + id: quinn_proto::StreamId, + ) -> Result<(), quinn_proto::FinishError> { + self.connection.finish(id) + } + + /// Polls the connection for an event that happend on it. + pub(crate) fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll { + // Nothing more can be done if the connection is drained. + // Return `Pending` without registering the waker, essentially freezing the task forever. + if self.connection.is_drained() { + tracing::error!("poll_event called on a drained connection"); + return Poll::Pending; + } + + // Process events that the endpoint has sent to us. + loop { + match Pin::new(&mut self.from_endpoint).poll_next(cx) { + Poll::Ready(Some(event)) => self.connection.handle_event(event), + Poll::Ready(None) => { + assert!(self.closed.is_none()); + let err = Error::ClosedChannel; + self.closed = Some(err.clone()); + return Poll::Ready(ConnectionEvent::ConnectionLost(err)); + } + Poll::Pending => break, + } + } + + 'send_pending: loop { + // Sending the pending event to the endpoint. If the endpoint is too busy, we just + // stop the processing here. + // There is a bit of a question in play here: should be continue to accept events + // through `from_endpoint` if `to_endpoint` is busy? + // We need to be careful to avoid a potential deadlock if both `from_endpoint` and + // `to_endpoint` are full. As such, we continue to transfer data from `from_endpoint` + // to the `quinn_proto::Connection` (see above). + // However we don't deliver substream-related events to the user as long as + // `to_endpoint` is full. This should propagate the back-pressure of `to_endpoint` + // being full to the user. + if let Some(pending_to_endpoint) = &mut self.pending_to_endpoint { + match Future::poll(Pin::new(pending_to_endpoint), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(()) => self.pending_to_endpoint = None, + } + } + + let now = Instant::now(); + + // Poll the connection for packets to send on the UDP socket and try to send them on + // `to_endpoint`. + while let Some(transmit) = self.connection.poll_transmit(now) { + let endpoint = self.endpoint.clone(); + assert!(self.pending_to_endpoint.is_none()); + self.pending_to_endpoint = Some(Box::pin(async move { + // TODO: ECN bits not handled + endpoint + .send_udp_packet(transmit.destination, transmit.contents) + .await; + })); + continue 'send_pending; + } + + // The connection also needs to be able to send control messages to the endpoint. This is + // handled here, and we try to send them on `to_endpoint` as well. + while let Some(endpoint_event) = self.connection.poll_endpoint_events() { + let endpoint = self.endpoint.clone(); + let connection_id = self.connection_id; + assert!(self.pending_to_endpoint.is_none()); + self.pending_to_endpoint = Some(Box::pin(async move { + endpoint + .report_quinn_event(connection_id, endpoint_event) + .await; + })); + continue 'send_pending; + } + + // Timeout system. + // We break out of the following loop until if `poll_timeout()` returns `None` or if + // polling `self.next_timeout` returns `Poll::Pending`. + loop { + if let Some(next_timeout) = &mut self.next_timeout { + match Future::poll(Pin::new(next_timeout), cx) { + Poll::Ready(()) => { + self.connection.handle_timeout(now); + self.next_timeout = None; + } + Poll::Pending => break, + } + } else if let Some(when) = self.connection.poll_timeout() { + if when <= now { + self.connection.handle_timeout(now); + } else { + let delay = when - now; + self.next_timeout = Some(futures_timer::Delay::new(delay)); + } + } else { + break; + } + } + + // The final step consists in handling the events related to the various substreams. + while let Some(event) = self.connection.poll() { + match event { + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Opened { + dir: quinn_proto::Dir::Uni, + }) + | quinn_proto::Event::Stream(quinn_proto::StreamEvent::Available { + dir: quinn_proto::Dir::Uni, + }) + | quinn_proto::Event::DatagramReceived => { + // We don't use datagrams or unidirectional streams. If these events + // happen, it is by some code not compatible with libp2p-quic. + self.connection + .close(Instant::now(), From::from(0u32), Default::default()); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Readable { id }) => { + return Poll::Ready(ConnectionEvent::StreamReadable(id)); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Writable { id }) => { + return Poll::Ready(ConnectionEvent::StreamWritable(id)); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Available { + dir: quinn_proto::Dir::Bi, + }) => { + return Poll::Ready(ConnectionEvent::StreamAvailable); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Opened { + dir: quinn_proto::Dir::Bi, + }) => { + return Poll::Ready(ConnectionEvent::StreamOpened); + } + quinn_proto::Event::ConnectionLost { reason } => { + assert!(self.closed.is_none()); + self.is_handshaking = false; + let err = Error::Quinn(reason); + self.closed = Some(err.clone()); + return Poll::Ready(ConnectionEvent::ConnectionLost(err)); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Finished { + id, + stop_reason: _, + }) => { + // TODO: transmit `stop_reason` + return Poll::Ready(ConnectionEvent::StreamFinished(id)); + } + quinn_proto::Event::Connected => { + assert!(self.is_handshaking); + assert!(!self.connection.is_handshaking()); + self.is_handshaking = false; + return Poll::Ready(ConnectionEvent::Connected); + } + } + } + + break; + } + + Poll::Pending + } +} + +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("Connection").finish() + } +} + +impl Drop for Connection { + fn drop(&mut self) { + // TODO: don't do that if already drained + // We send a message to the endpoint. + self.endpoint.report_quinn_event_non_block( + self.connection_id, + quinn_proto::EndpointEvent::drained(), + ); + } +} + +/// Event generated by the [`Connection`]. +#[derive(Debug)] +pub(crate) enum ConnectionEvent { + /// Now connected to the remote. Can only happen if [`Connection::is_handshaking`] was + /// returning `true`. + Connected, + + /// Connection has been closed and can no longer be used. + ConnectionLost(Error), + + /// Generated after [`Connection::pop_incoming_substream`] has been called and has returned + /// `None`. After this event has been generated, this method is guaranteed to return `Some`. + StreamAvailable, + /// Generated after [`Connection::pop_outgoing_substream`] has been called and has returned + /// `None`. After this event has been generated, this method is guaranteed to return `Some`. + StreamOpened, + + StreamReadable(quinn_proto::StreamId), + StreamWritable(quinn_proto::StreamId), + StreamFinished(quinn_proto::StreamId), +} diff --git a/transports/quic/src/endpoint.rs b/transports/quic/src/endpoint.rs new file mode 100644 index 00000000000..36dbd4ceede --- /dev/null +++ b/transports/quic/src/endpoint.rs @@ -0,0 +1,621 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Background task dedicated to manage the QUIC state machine. +//! +//! Considering that all QUIC communications happen over a single UDP socket, one needs to +//! maintain a unique synchronization point that holds the state of all the active connections. +//! +//! The [`Endpoint`] object represents this synchronization point. It maintains a background task +//! whose role is to interface with the UDP socket. Communication between the background task and +//! the rest of the code only happens through channels. See the documentation of the +//! [`background_task`] for a thorough description. + +use crate::{connection::Connection, x509}; + +use async_std::net::SocketAddr; +use either::Either; +use futures::{ + channel::{mpsc, oneshot}, + lock::Mutex, + prelude::*, +}; +use libp2p_core::{ + multiaddr::{host_addresses, Multiaddr, Protocol}, + transport::TransportError, +}; +use std::{ + collections::{HashMap, VecDeque}, + fmt, io, + sync::{Arc, Weak}, + task::Poll, + time::{Duration, Instant}, +}; +use tracing::{info, warn}; + +/// Represents the configuration for the [`Endpoint`]. +#[derive(Debug, Clone)] +pub struct Config { + /// The client configuration to pass to `quinn_proto`. + client_config: quinn_proto::ClientConfig, + /// The server configuration to pass to `quinn_proto`. + server_config: Arc, + /// The endpoint configuration to pass to `quinn_proto`. + endpoint_config: Arc, + /// The [`Multiaddr`] to use to spawn the UDP socket. + multiaddr: Multiaddr, +} + +impl Config { + /// Creates a new configuration object with default values. + pub fn new( + keypair: &libp2p_core::identity::Keypair, + multiaddr: Multiaddr, + ) -> Result { + let mut transport = quinn_proto::TransportConfig::default(); + transport.stream_window_uni(0); + transport.datagram_receive_buffer_size(None); + transport.keep_alive_interval(Some(Duration::from_millis(10))); + let transport = Arc::new(transport); + let (client_tls_config, server_tls_config) = x509::make_tls_config(keypair)?; + let mut server_config = quinn_proto::ServerConfig::default(); + server_config.transport = transport.clone(); + server_config.crypto = Arc::new(server_tls_config); + let mut client_config = quinn_proto::ClientConfig::default(); + client_config.transport = transport; + client_config.crypto = Arc::new(client_tls_config); + Ok(Self { + client_config, + server_config: Arc::new(server_config), + endpoint_config: Default::default(), + multiaddr: multiaddr, + }) + } +} + +/// Object containing all the QUIC resources shared between all connections. +// TODO: expand docs +// TODO: Debug trait +// TODO: remove useless fields +pub struct Endpoint { + /// Channel to the background of the endpoint. + /// See [`Endpoint::new_connections`] (just below) for a commentary about the mutex. + to_endpoint: Mutex>, + + /// Channel where new connections are being sent. + /// This is protected by a futures-friendly `Mutex`, meaning that receiving a connection is + /// done in two steps: locking this mutex, and grabbing the next element on the `Receiver`. + /// The only consequence of this `Mutex` is that multiple simultaneous calls to + /// [`Endpoint::next_incoming`] are serialized. + new_connections: Mutex>>, + + /// Copy of [`Endpoint::to_endpoint`], except not behind a `Mutex`. Used if we want to be guaranteed a + /// slot in the messages buffer. + to_endpoint2: mpsc::Sender, +} + +impl Endpoint { + /// Builds a new `Endpoint`. + pub fn new(config: Config) -> Result, TransportError> { + let mut multiaddr = config.multiaddr.clone(); + let local_socket_addr = match crate::transport::multiaddr_to_socketaddr(&config.multiaddr) { + Ok(a) => a, + Err(()) => return Err(TransportError::MultiaddrNotSupported(multiaddr)), + }; + + // NOT blocking, as per man:bind(2), as we pass an IP address. + let socket = + std::net::UdpSocket::bind(&local_socket_addr).map_err(TransportError::Other)?; + // TODO: + let port_is_zero = local_socket_addr.port() == 0; + let local_socket_addr = socket.local_addr().map_err(TransportError::Other)?; + if port_is_zero { + assert_ne!(local_socket_addr.port(), 0); + assert_eq!(multiaddr.pop(), Some(Protocol::Quic)); + assert_eq!(multiaddr.pop(), Some(Protocol::Udp(0))); + multiaddr.push(Protocol::Udp(local_socket_addr.port())); + multiaddr.push(Protocol::Quic); + } + + let (to_endpoint_tx, to_endpoint_rx) = mpsc::channel(32); + let to_endpoint2 = to_endpoint_tx.clone(); + let (new_connections_tx, new_connections_rx) = mpsc::channel(500); + + let endpoint = Arc::new(Endpoint { + to_endpoint: Mutex::new(to_endpoint_tx), + to_endpoint2, + new_connections: Mutex::new(new_connections_rx), + }); + + let send_addr = |e| { + new_connections_tx + .clone() + .try_send(Either::Right(e)) + .expect("we just cloned this, so we have capacity; qed") + }; + + // TODO: IP address stuff + if local_socket_addr.ip().is_unspecified() { + tracing::info!("returning all local IPs for unspecified address"); + let suffixes = [Protocol::Udp(local_socket_addr.port()), Protocol::Quic]; + let local_addresses = host_addresses(&suffixes).map_err(TransportError::Other)?; + for (_, _, address) in local_addresses { + tracing::info!("sending address {:?}", address); + send_addr(address) + } + } else { + tracing::info!("sending address {:?}", multiaddr); + send_addr(multiaddr) + } + + // TODO: just for testing, do proper task spawning + async_std::task::spawn(background_task( + config.clone(), + Arc::downgrade(&endpoint), + async_std::net::UdpSocket::from(socket), + new_connections_tx, + to_endpoint_rx.fuse(), + )); + + // let endpoint = EndpointRef { reference, channel }; + // let join_handle = spawn(endpoint.clone()); + // Ok((Self(endpoint), join_handle)) + + Ok(endpoint) + } + + /// Asks the endpoint to start dialing the given address. + /// + /// Note that this method only *starts* the dialing. `Ok` is returned as soon as possible, even + /// when the remote might end up being unreachable. + pub(crate) async fn dial( + &self, + addr: SocketAddr, + ) -> Result { + // The two `expect`s below can panic if the background task has stopped. The background + // task can stop only if the `Endpoint` is destroyed or if the task itself panics. In other + // words, we panic here iff a panic has already happened somewhere else, which is a + // reasonable thing to do. + let (tx, rx) = oneshot::channel(); + self.to_endpoint + .lock() + .await + .send(ToEndpoint::Dial { addr, result: tx }) + .await + .expect("background task has crashed"); + info!("Sent dial message, awaiting response"); + rx.await.expect("background task has crashed") + } + + /// Tries to pop a new incoming connection from the queue. + pub(crate) async fn next_incoming(&self) -> Either { + // The `expect` below can panic if the background task has stopped. The background task + // can stop only if the `Endpoint` is destroyed or if the task itself panics. In other + // words, we panic here iff a panic has already happened somewhere else, which is a + // reasonable thing to do. + let mut new_connections = self.new_connections.lock().await; + new_connections + .next() + .await + .expect("background task has crashed") + } + + /// Asks the endpoint to send a UDP packet. + /// + /// Note that this method only queues the packet and returns as soon as the packet is in queue. + /// There is no guarantee that the packet will actually be sent, but considering that this is + /// a UDP packet, you cannot rely on the packet being delivered anyway. + pub(crate) async fn send_udp_packet( + &self, + destination: SocketAddr, + data: impl Into>, + ) { + let _ = self + .to_endpoint + .lock() + .await + .send(ToEndpoint::SendUdpPacket { + destination, + data: data.into(), + }) + .await; + } + + /// Report to the endpoint an event on a [`quinn_proto::Connection`]. + /// + /// This is typically called by a [`Connection`]. + /// + /// If `event.is_drained()` is true, the event indicates that the connection no longer exists. + /// This must therefore be the last event sent using this [`quinn_proto::ConnectionHandle`]. + pub(crate) async fn report_quinn_event( + &self, + connection_id: quinn_proto::ConnectionHandle, + event: quinn_proto::EndpointEvent, + ) { + self.to_endpoint + .lock() + .await + .send(ToEndpoint::ProcessConnectionEvent { + connection_id, + event, + }) + .await + .expect("background task has crashed"); + } + + /// Similar to [`Endpoint::report_quinn_event`], except that the message sending is guaranteed + /// to be instantaneous and to succeed. + /// + /// This method bypasses back-pressure mechanisms and is meant to be called only from + /// destructors, where waiting is not advisable. + pub(crate) fn report_quinn_event_non_block( + &self, + connection_id: quinn_proto::ConnectionHandle, + event: quinn_proto::EndpointEvent, + ) { + // We implement this by cloning the `mpsc::Sender`. Since each sender is guaranteed a slot + // in the buffer, cloning the sender reserves the slot and sending thus always succeeds. + let result = self + .to_endpoint2 + .clone() + .try_send(ToEndpoint::ProcessConnectionEvent { + connection_id, + event, + }); + assert!(result.is_ok()); + } +} + +/// Message sent to the endpoint background task. +#[derive(Debug)] +enum ToEndpoint { + /// Instruct the endpoint to start connecting to the given address. + Dial { + /// UDP address to connect to. + addr: SocketAddr, + /// Channel to return the result of the dialing to. + result: oneshot::Sender>, + }, + /// Sent by a `quinn_proto` connection when the endpoint needs to process an event generated + /// by a connection. The event itself is opaque to us. Only `quinn_proto` knows what is in + /// there. + ProcessConnectionEvent { + connection_id: quinn_proto::ConnectionHandle, + event: quinn_proto::EndpointEvent, + }, + /// Instruct the endpoint to send a packet of data on its UDP socket. + SendUdpPacket { + /// Destination of the UDP packet. + destination: SocketAddr, + /// Packet of data to send. + data: Box<[u8]>, + }, +} + +/// Task that runs in the background for as long as the endpont is alive. Responsible for +/// processing messages and the UDP socket. +/// +/// The `receiver` parameter must be the receiving side of the `Endpoint::to_endpoint` sender. +/// +/// # Behaviour +/// +/// This background task is responsible for the following: +/// +/// - Sending packets on the UDP socket. +/// - Receiving packets from the UDP socket and feed them to the [`quinn_proto::Endpoint`] state +/// machine. +/// - Transmitting events generated by the [`quinn_proto::Endpoint`] to the corresponding +/// [`Connection`]. +/// - Receiving messages from the `receiver` and processing the requested actions. This includes +/// UDP packets to send and events emitted by the [`Connection`] objects. +/// - Sending new connections on `new_connections`. +/// +/// When it comes to channels, there exists three main multi-producer-single-consumer channels +/// in play: +/// +/// - One channel, represented by `Endpoint::to_endpoint` and `receiver`, that communicates +/// messages from [`Endpoint`] to the background task and from the [`Connection`] to the +/// background task. +/// - One channel per each existing connection that communicates messages from the background +/// task to that [`Connection`]. +/// - One channel for the background task to send newly-opened connections to. The receiving +/// side is normally processed by a "listener" as defined by the [`libp2p_core::Transport`] +/// trait. +/// +/// In order to avoid an unbounded buffering of events, we prioritize sending data on the UDP +/// socket over everything else. If the network interface is too busy to process our packets, +/// everything comes to a freeze (including receiving UDP packets) until it is ready to accept +/// more. +/// +/// Apart from freezing when the network interface is too busy, the background task should sleep +/// as little as possible. It is in particular important for the `receiver` to be drained as +/// quickly as possible in order to avoid unnecessary back-pressure on the [`Connection`] objects. +/// +/// ## Back-pressure on `new_connections` +/// +/// The [`quinn_proto::Endpoint`] object contains an accept buffer, in other words a buffer of the +/// incoming connections waiting to be accepted. When a new connection is signalled, we send this +/// new connection on the `new_connections` channel in an asynchronous way, and we only free a slot +/// in the accept buffer once the element has actually been enqueued on `new_connections`. There +/// are therefore in total three buffers in play: the `new_connections` channel itself, the queue +/// of elements being sent on `new_connections`, and the accept buffer of the +/// [`quinn_proto::Endpoint`]. +/// +/// Unfortunately, this design has the consequence that, on the network layer, we will accept a +/// certain number of incoming connections even if [`Endpoint::next_incoming`] is never even +/// called. The `quinn-proto` library doesn't provide any way to not accept incoming connections +/// apart from filling the accept buffer. +/// +/// ## Back-pressure on connections +/// +/// Because connections are processed by the user at a rate of their choice, we cannot properly +/// handle the situation where the channel from the background task to individual connections is +/// full. Sleeping the task while waiting for the connection to be processed by the user could +/// even lead to a deadlock if this processing is also sleeping waiting for some other action that +/// itself depends on the background task (e.g. if processing the connection is waiting for a +/// message arriving on a different connection). +/// +/// In an ideal world, we would handle a background-task-to-connection channel being full by +/// dropping UDP packets destined to this connection, as a way to back-pressure the remote. +/// Unfortunately, the `quinn-proto` library doesn't provide any way for us to know which +/// connection a UDP packet is destined for before it has been turned into a +/// [`ConnectionEvent`](quinn_proto::ConnectionEvent) and because these +/// [`ConnectionEvent`](quinn_proto::ConnectionEvent)s are sometimes used to synchronize the states +/// of the endpoint and connection, it would be a logic error to silently drop them. +/// +/// We handle this tricky situation by simply killing connections as soon as their associated +/// channel is full. +/// +// TODO: actually implement the killing of connections if channel is full, at the moment we just +// wait +/// # Shutdown +/// +/// The background task shuts down if `endpoint_weak`, `receiver` or `new_connections` become +/// disconnected/invalid. This corresponds to the lifetime of the associated [`Endpoint`]. +/// +/// Keep in mind that we pass an `Arc` whenever we create a new connection, which +/// guarantees that the [`Endpoint`], and therefore the background task, is properly kept alive +/// for as long as any QUIC connection is open. + +#[derive(Copy, Clone, Debug)] +enum Void {} +async fn background_task( + config: Config, + endpoint_weak: Weak, + udp_socket: async_std::net::UdpSocket, + mut new_connections: mpsc::Sender>, + mut receiver: stream::Fuse>, +) { + // The actual QUIC state machine. + let mut endpoint = quinn_proto::Endpoint::new( + config.endpoint_config.clone(), + Some(config.server_config.clone()), + ); + + // List of all active connections, with a sender to notify them of events. + let mut alive_connections = HashMap::>::new(); + + // Buffer where we write packets received from the UDP socket. + let mut socket_recv_buffer = vec![0; 65536]; + + // The quinn_proto endpoint can give us new connections for as long as its accept buffer + // isn't full. This buffer is used to push these new connections while we are waiting to + // send them on the `new_connections` channel. We only call `endpoint.accept()` when we remove + // an element from this list, which guarantees that it doesn't grow unbounded. + // TODO: with_capacity? + let mut queued_new_connections = VecDeque::new(); + + // Next packet waiting to be transmitted on the UDP socket, if any. + // Note that this variable isn't strictly necessary, but it reduces code duplication in the + // code below. + let mut next_packet_out: Option<(SocketAddr, Box<[u8]>)> = None; + + // Main loop of the task. + loop { + // Start by flushing `next_packet_out`. + if let Some((destination, data)) = next_packet_out.take() { + tracing::trace!("sending {} bytes to {}", data.len(), destination); + // We block the current task until the packet is sent. This way, if the + // network interface is too busy, we back-pressure all of our internal + // channels. + // TODO: set ECN bits; there is no support for them in the ecosystem right now + // TODO: use a circular buffer instead. + match udp_socket.send_to(&data, destination).await { + Ok(n) if n == data.len() => {} + Ok(_) => tracing::error!( + "QUIC UDP socket violated expectation that packets are always fully \ + transferred" + ), + + // Errors on the socket are expected to never happen, and we handle them by simply + // printing a log message. The packet gets discarded in case of error, but we are + // robust to packet losses and it is consequently not a logic error to process with + // normal operations. + Err(err) => tracing::error!("Error while sending on QUIC UDP socket: {:?}", err), + } + } + + // The endpoint might request packets to be sent out. This is handled in priority to avoid + // buffering up packets. + if let Some(packet) = endpoint.poll_transmit() { + tracing::trace!("Got a new packet to send"); + assert!(next_packet_out.is_none()); + next_packet_out = Some((packet.destination, packet.contents)); + continue; + } + + futures::select! { + message = receiver.next() => { + // Received a message from a different part of the code requesting us to + // do something. + span!("message received"); + match message { + // Shut down if the endpoint has shut down. + None => return, + + Some(ToEndpoint::Dial { addr, result }) => { + span!("dialing", addr = display(addr), side = debug(quinn_proto::Side::Client)); + info!("received dialout request"); + // This `"l"` seems necessary because an empty string is an invalid domain + // name. While we don't use domain names, the underlying rustls library + // is based upon the assumption that we do. + let (connection_id, connection) = + match endpoint.connect(config.client_config.clone(), addr, "l") { + Ok(c) => c, + Err(err) => { + let _ = result.send(Err(err)); + warn!("QUIC connection failure"); + continue; + } + }; + + info!("received connection ID: {:?}", connection_id); + let endpoint_arc = match endpoint_weak.upgrade() { + Some(ep) => ep, + None => return, // Shut down the task if the endpoint is dead. + }; + + info!("endpoint is alive"); + + assert_eq!(connection.side(), quinn_proto::Side::Client); + let (tx, rx) = mpsc::channel(16); + let connection = Connection::from_quinn_connection(endpoint_arc, connection, connection_id, rx); + alive_connections.insert(connection_id, tx); + let _ = result.send(Ok(connection)); + info!("sent reply to dialer"); + } + + // A connection wants to notify the endpoint of something. + Some(ToEndpoint::ProcessConnectionEvent { connection_id, event }) => { + if !alive_connections.contains_key(&connection_id) { + continue + } + // We "drained" event indicates that the connection no longer exists and + // its ID can be reclaimed. + let is_drained_event = event.is_drained(); + if is_drained_event { + alive_connections.remove(&connection_id); + } + if let Some(event_back) = endpoint.handle_event(connection_id, event) { + assert!(!is_drained_event); + // TODO: don't await here /!\ + let _ = alive_connections.get_mut(&connection_id).unwrap().clone().try_send(event_back); + } + } + + // Data needs to be sent on the UDP socket. + Some(ToEndpoint::SendUdpPacket { destination, data }) => { + assert!(next_packet_out.is_none()); + next_packet_out = Some((destination, data)); + continue; + } + } + } + + // The future we create here wakes up if two conditions are fulfilled: + // + // - The `new_connections` channel is ready to accept a new element. + // - `queued_new_connections` is not empty. + // + // When this happens, we pop an element from `queued_new_connections`, put it on the + // channel, and call `endpoint.accept()`, thereby allowing the QUIC state machine to + // feed a new incoming connection to us. + readiness = { + let active = !queued_new_connections.is_empty(); + let new_connections = &mut new_connections; + future::poll_fn(move |cx| { + if active { new_connections.poll_ready(cx) } else { Poll::Pending } + }).fuse() + } => { + span!("time to accept connection"); + if readiness.is_err() { + // new_connections channel has been dropped, meaning that the endpoint has + // been destroyed. + return; + } + + let elem = queued_new_connections.pop_front() + .expect("if queue is empty, the future above is always Pending; qed"); + new_connections.start_send(elem) + .expect("future is waken up only if poll_ready returned Ready; qed"); + endpoint.accept(); + } + + result = udp_socket.recv_from(&mut socket_recv_buffer).fuse() => { + let (packet_len, packet_src) = match result { + Ok(v) => v, + // Errors on the socket are expected to never happen, and we handle them by + // simply printing a log message. + Err(err) => { + tracing::error!("Error while receive on QUIC UDP socket: {:?}", err); + continue; + }, + }; + span!("received packet", len = display(packet_len), src = display(packet_src)); + tracing::trace!("processing"); + + // Received a UDP packet from the socket. + assert!(packet_len <= socket_recv_buffer.len()); + let packet = From::from(&socket_recv_buffer[..packet_len]); + // TODO: ECN bits aren't handled + match endpoint.handle(Instant::now(), packet_src, None, packet) { + None => {}, + Some((connec_id, quinn_proto::DatagramEvent::ConnectionEvent(event))) => { + // Event to send to an existing connection. + if let Some(sender) = alive_connections.get_mut(&connec_id) { + let _ = sender.clone().try_send(event); + } else { + tracing::error!("State mismatch: event for closed connection"); + } + }, + Some((connec_id, quinn_proto::DatagramEvent::NewConnection(connec))) => { + // A new connection has been received. `connec_id` is a newly-allocated + // identifier. + assert_eq!(connec.side(), quinn_proto::Side::Server); + let (tx, rx) = mpsc::channel(16); + alive_connections.insert(connec_id, tx); + let endpoint_arc = match endpoint_weak.upgrade() { + Some(ep) => ep, + None => { + tracing::trace!("endpoint is dead, exiting"); + return // Shut down the task if the endpoint is dead. + } + }; + let connection = Connection::from_quinn_connection(endpoint_arc, connec, connec_id, rx); + + // As explained in the documentation, we put this new connection in an + // intermediary buffer. At the next loop iteration we will try to move it + // to the `new_connections` channel. We call `endpoint.accept()` only once + // the element has successfully been sent on `new_connections`. + queued_new_connections.push_back(Either::Left(connection)); + tracing::trace!("connection queued"); + }, + } + tracing::trace!("receive processing complete"); + } + } + } +} + +impl fmt::Debug for Endpoint { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("Endpoint").finish() + } +} diff --git a/transports/quic/src/error.rs b/transports/quic/src/error.rs new file mode 100644 index 00000000000..4da9ec00c99 --- /dev/null +++ b/transports/quic/src/error.rs @@ -0,0 +1,73 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::channel::mpsc::SendError; +use io::ErrorKind; +use std::io; +use thiserror::Error; + +/// An error that can be returned by libp2p-quic. +#[derive(Error, Debug)] +pub enum Error { + /// Fatal I/O error + #[error("Fatal I/O error {0}")] + IO(#[from] std::io::Error), + /// QUIC protocol error + #[error("QUIC protocol error: {0}")] + ConnectionError(#[from] quinn_proto::ConnectionError), + /// Peer stopped receiving data + #[error("Peer stopped receiving data: code {0}")] + Stopped(quinn_proto::VarInt), + /// Connection was prematurely closed + #[error("Connection was prematurely closed")] + ConnectionLost, + /// Error making the connection. + #[error("Connection failure: {0}")] + ConnectError(#[from] quinn_proto::ConnectError), + /// Cannot listen on the same endpoint more than once + #[error("Cannot listen on the same endpoint more than once")] + AlreadyListening, + /// The stream was reset by the peer. + #[error("Peer reset stream: code {0}")] + Reset(quinn_proto::VarInt), + /// Problem finishing stream + #[error("Error finishing stream: {0}")] + Finish(#[from] quinn_proto::FinishError), + /// Connection already being closed + #[error("Connection already being closed")] + ConnectionClosing, +} + +impl From for io::Error { + fn from(e: Error) -> Self { + match e { + Error::IO(e) => io::Error::new(e.kind(), Error::IO(e)), + Error::ConnectionError(e) => e.into(), + e @ Error::ConnectionClosing | e @ Error::ConnectError(_) => { + io::Error::new(ErrorKind::Other, e) + } + e @ Error::Stopped(_) | e @ Error::Reset(_) | e @ Error::ConnectionLost => { + io::Error::new(ErrorKind::ConnectionAborted, e) + } + e @ Error::Finish(_) => io::Error::new(ErrorKind::BrokenPipe, e), + e @ Error::AlreadyListening => io::Error::new(ErrorKind::AddrInUse, e), + } + } +} diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs new file mode 100644 index 00000000000..f9c9c7bc2c7 --- /dev/null +++ b/transports/quic/src/lib.rs @@ -0,0 +1,76 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![recursion_limit = "1024"] + +//! Implementation of the libp2p `Transport` and `StreamMuxer` traits for QUIC. +//! +//! # Usage +//! +//! Example: +//! +//! ``` +//! use libp2p_quic::{Config, Endpoint}; +//! use libp2p_core::Multiaddr; +//! +//! let keypair = libp2p_core::identity::Keypair::generate_ed25519(); +//! let addr = "/ip4/127.0.0.1/udp/12345/quic".parse().expect("bad address?"); +//! let quic_config = Config::new(&keypair, addr).expect("could not make config"); +//! let quic_endpoint = Endpoint::new(quic_config).expect("I/O error"); +//! ``` +//! +//! The `Endpoint` struct implements the `Transport` trait of the `core` library. See the +//! documentation of `core` and of libp2p in general to learn how to use the `Transport` trait. +//! +//! Note that QUIC provides transport, security, and multiplexing in a single protocol. Therefore, +//! QUIC connections do not need to be upgraded. You will get a compile-time error if you try. +//! Instead, you must pass all needed configuration into the constructor. +//! +//! # Design Notes +//! +//! The entry point is the `Endpoint` struct. It represents a single QUIC endpoint. You +//! should generally have one of these per process. +//! +//! `Endpoint` manages a background task that processes all incoming packets. Each +//! `QuicConnection` also manages a background task, which handles socket output and timer polling. + +#![deny(unsafe_code)] + +macro_rules! span { + ($name: expr $(,$($i: tt)+)?) => { + let span = ::tracing::trace_span!($name $(,$($i)+)?); + let _guard = span.enter(); + } +} + +mod connection; +mod endpoint; +mod error; +mod muxer; +mod upgrade; +mod x509; + +pub mod transport; + +pub use endpoint::{Config, Endpoint}; +pub use error::Error; +pub use muxer::QuicMuxer; +pub use transport::QuicTransport; +pub use upgrade::Upgrade; diff --git a/transports/quic/src/muxer.rs b/transports/quic/src/muxer.rs new file mode 100644 index 00000000000..f9a7009afca --- /dev/null +++ b/transports/quic/src/muxer.rs @@ -0,0 +1,375 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::connection::{Connection, ConnectionEvent}; +use crate::error::Error; + +use libp2p_core::StreamMuxer; +use parking_lot::Mutex; +use std::{ + collections::HashMap, + fmt, + task::{Context, Poll, Waker}, +}; + +/// State for a single opened QUIC connection. +// TODO: the inner `Mutex` should theoretically be a `futures::lock::Mutex` (or something similar), +// in order to sleep the current task if concurrent access to the connection is required +pub struct QuicMuxer { + inner: Mutex, +} + +/// Mutex-protected fields of [`QuicMuxer`]. +struct QuicMuxerInner { + /// Inner connection object that yields events. + connection: Connection, + /// State of all the substreams that the muxer reports as open. + substreams: HashMap, + /// Waker to wake if a new outgoing substream is opened. + poll_substream_opened_waker: Option, + /// Waker to wake if the connection is closed. + poll_close_waker: Option, + /// Count of active (writable) substreams. + writable_substreams: usize, +} + +/// State of a single substream. +#[derive(Default)] +struct SubstreamState { + /// Waker to wake if the substream becomes readable. + read_waker: Option, + /// Waker to wake if the substream becomes writable. + write_waker: Option, + /// Waker to wake if the substream becomes closed. + finished_waker: Option, + /// `true` if and only if the substream has been closed for reading. + read_closed: bool, + /// `true` if and only if the substream has been closed for writing. + write_closed: bool, +} + +impl QuicMuxer { + /// Crate-internal function that builds a [`QuicMuxer`] from a raw connection. + /// + /// # Panic + /// + /// Panics if `connection.is_handshaking()` returns `true`. + pub(crate) fn from_connection(connection: Connection) -> Self { + assert!(!connection.is_handshaking()); + + QuicMuxer { + inner: Mutex::new(QuicMuxerInner { + connection, + substreams: Default::default(), + poll_substream_opened_waker: None, + poll_close_waker: None, + writable_substreams: 0, + }), + } + } +} + +impl StreamMuxer for QuicMuxer { + type OutboundSubstream = (); + type Substream = quinn_proto::StreamId; + type Error = Error; + + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + // We use `poll_inbound` to perform the background processing of the entire connection. + let mut inner = self.inner.lock(); + span!("poll_inbound", side = debug(inner.connection.side())); + tracing::trace!("poll_inbound called"); + + while let Poll::Ready(event) = inner.connection.poll_event(cx) { + match event { + ConnectionEvent::Connected => { + log::error!("Unexpected Connected event on established QUIC connection"); + } + ConnectionEvent::ConnectionLost(_) => { + if let Some(waker) = inner.poll_close_waker.take() { + waker.wake(); + } + } + + ConnectionEvent::StreamOpened => { + if let Some(waker) = inner.poll_substream_opened_waker.take() { + waker.wake(); + } + } + ConnectionEvent::StreamReadable(substream) => { + if let Some(substream) = inner.substreams.get_mut(&substream) { + if let Some(waker) = substream.read_waker.take() { + waker.wake(); + } + } + } + ConnectionEvent::StreamWritable(substream) => { + if let Some(substream) = inner.substreams.get_mut(&substream) { + if let Some(waker) = substream.write_waker.take() { + waker.wake(); + } + } + } + ConnectionEvent::StreamFinished(substream) => { + if let Some(substream) = inner.substreams.get_mut(&substream) { + if let Some(waker) = substream.read_waker.take() { + waker.wake(); + } + if let Some(waker) = substream.write_waker.take() { + waker.wake(); + } + if let Some(waker) = substream.finished_waker.take() { + waker.wake(); + } + substream.write_closed = true + } + inner.writable_substreams -= 1; + } + + // Do nothing as this is handled below. + ConnectionEvent::StreamAvailable => {} + } + } + + if let Some(substream) = inner.connection.pop_incoming_substream() { + inner.substreams.insert(substream, Default::default()); + inner.writable_substreams += 1; + tracing::trace!("New substream"); + Poll::Ready(Ok(substream)) + } else if inner.connection.is_drained() { + if let Some(w) = inner.poll_close_waker.take() { + tracing::trace!("Inner connection is drained, waking close waker"); + w.wake() + } + Poll::Ready(Err(Error::ConnectionLost)) + } else { + Poll::Pending + } + } + + fn open_outbound(&self) -> Self::OutboundSubstream {} + + fn poll_outbound( + &self, + cx: &mut Context<'_>, + _: &mut Self::OutboundSubstream, + ) -> Poll> { + // Note that this implementation makes it possible to poll the same outbound substream + // over and over again and get new substreams. Using the API this way is invalid and would + // normally result in a panic, but we decide to just ignore this question. + + let mut inner = self.inner.lock(); + if let Some(substream) = inner.connection.pop_outgoing_substream() { + inner.substreams.insert(substream, Default::default()); + inner.writable_substreams += 1; + return Poll::Ready(Ok(substream)); + } + + // Register `cx.waker()` as having to be woken up once a substream is available. + if !inner + .poll_substream_opened_waker + .as_ref() + .map_or(false, |w| w.will_wake(cx.waker())) + { + inner.poll_substream_opened_waker = Some(cx.waker().clone()); + } + Poll::Pending + } + + fn destroy_outbound(&self, _: Self::OutboundSubstream) {} + + fn is_remote_acknowledged(&self) -> bool { + // TODO: stub + true + } + + fn write_substream( + &self, + cx: &mut Context<'_>, + substream: &mut Self::Substream, + buf: &[u8], + ) -> Poll> { + let mut inner = self.inner.lock(); + + match inner.connection.write_substream(*substream, buf) { + Ok(bytes) => Poll::Ready(Ok(bytes)), + Err(quinn_proto::WriteError::Stopped(_)) => Poll::Ready(Ok(0)), // EOF + Err(quinn_proto::WriteError::Blocked) => { + if let Some(substream) = inner.substreams.get_mut(substream) { + if !substream + .write_waker + .as_ref() + .map_or(false, |w| w.will_wake(cx.waker())) + { + substream.write_waker = Some(cx.waker().clone()); + } + } + Poll::Pending + } + Err(quinn_proto::WriteError::UnknownStream) => { + log::error!( + "The application used a connection that is already being \ + closed. This is a bug in the application or in libp2p." + ); + Poll::Pending + } + } + } + + /// Try to read from a substream. This will return an error if the substream has + /// not yet been written to. + fn read_substream( + &self, + cx: &mut Context<'_>, + substream: &mut Self::Substream, + buf: &mut [u8], + ) -> Poll> { + let mut inner = self.inner.lock(); + + match inner.connection.read_substream(*substream, buf) { + Ok(bytes) => Poll::Ready(Ok(bytes)), + Err(quinn_proto::ReadError::Reset(_)) => Poll::Ready(Ok(0)), // EOF + Err(quinn_proto::ReadError::Blocked) => { + if let Some(substream) = inner.substreams.get_mut(substream) { + if !substream + .read_waker + .as_ref() + .map_or(false, |w| w.will_wake(cx.waker())) + { + substream.read_waker = Some(cx.waker().clone()); + } + } + Poll::Pending + } + Err(quinn_proto::ReadError::UnknownStream) => { + log::error!( + "The application used a connection that is already being \ + closed. This is a bug in the application or in libp2p." + ); + Poll::Pending + } + } + } + + fn shutdown_substream( + &self, + cx: &mut Context<'_>, + substream_id: &mut Self::Substream, + ) -> Poll> { + let mut inner = self.inner.lock(); + let QuicMuxerInner { + ref mut substreams, + ref mut connection, + .. + } = &mut *inner; + + let substream = substreams + .get_mut(substream_id) + .expect("using a destroyed substream"); + + if substream.write_closed { + return Poll::Ready(Ok(())); + } + + if connection.shutdown_substream(*substream_id).is_err() { + substream.write_closed = true; + return Poll::Ready(Ok(())); + } + let waker = cx.waker(); + match substream.finished_waker.as_mut() { + None => substream.finished_waker = Some(waker.clone()), + Some(w) if w.will_wake(waker) && false => {} + Some(w) => std::mem::replace(w, waker.clone()).wake(), + } + Poll::Pending + } + + fn destroy_substream(&self, substream: Self::Substream) { + let mut inner = self.inner.lock(); + inner.substreams.remove(&substream); + } + + fn flush_substream( + &self, + _cx: &mut Context<'_>, + _substream: &mut Self::Substream, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn flush_all(&self, _cx: &mut Context<'_>) -> Poll> { + // TODO: call poll_transmit() and stuff + Poll::Ready(Ok(())) + } + + fn close(&self, cx: &mut Context<'_>) -> Poll> { + // StreamMuxer's `close` documentation mentions that it automatically implies `flush_all`. + if let Poll::Pending = self.flush_all(cx)? { + return Poll::Pending; + } + + // TODO: poll if closed or something + + let mut inner = self.inner.lock(); + if inner.connection.close_reason().is_some() || inner.connection.is_drained() { + return Poll::Ready(Ok(())); + } + span!("closing", side = debug(inner.connection.side())); + if inner.writable_substreams == 0 { + tracing::debug!("closing connection"); + inner.connection.close(); + } else { + tracing::debug!("shutting down pending substreams"); + let QuicMuxerInner { + ref mut substreams, + ref mut connection, + .. + } = &mut *inner; + for (stream_id, waker) in substreams.iter_mut() { + tracing::debug!("shutting down substream {:?}", stream_id); + connection.shutdown_substream(*stream_id); + if let Some(w) = waker.write_waker.take() { + w.wake() + } + waker.write_waker = Some(cx.waker().clone()); + if let Some(w) = waker.finished_waker.take() { + w.wake() + } + waker.finished_waker = Some(cx.waker().clone()); + } + } + + // Register `cx.waker()` as being woken up if the connection closes. + if !inner + .poll_close_waker + .as_ref() + .map_or(false, |w| w.will_wake(cx.waker())) + { + inner.poll_close_waker = Some(cx.waker().clone()); + } + Poll::Pending + } +} + +impl fmt::Debug for QuicMuxer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("QuicMuxer").finish() + } +} diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs new file mode 100644 index 00000000000..de0976b7b41 --- /dev/null +++ b/transports/quic/src/transport.rs @@ -0,0 +1,198 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the [`Transport`] trait for QUIC. +//! +//! Combines all the objects in the other modules to implement the trait. + +use crate::{endpoint::Endpoint, muxer::QuicMuxer, upgrade::Upgrade}; + +use either::{Left, Right}; +use futures::prelude::*; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + transport::{ListenerEvent, TransportError}, + PeerId, Transport, +}; +use std::{net::SocketAddr, pin::Pin, sync::Arc}; + +// We reexport the errors that are exposed in the API. +// All of these types use one another. +pub use crate::connection::Error as Libp2pQuicConnectionError; +pub use quinn_proto::{ + ApplicationClose, ConfigError, ConnectError, ConnectionClose, ConnectionError, + TransportError as QuinnTransportError, TransportErrorCode, +}; + +/// Wraps around an `Arc` and implements the [`Transport`] trait. +/// +/// > **Note**: This type is necessary because Rust unfortunately forbids implementing the +/// > `Transport` trait directly on `Arc`. +#[derive(Debug, Clone)] +pub struct QuicTransport(pub Arc); + +/// Error that can happen on the transport. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Error while trying to reach a remote. + #[error("{0}")] + Reach(ConnectError), + /// Error after the remote has been reached. + #[error("{0}")] + Established(Libp2pQuicConnectionError), +} + +impl Transport for QuicTransport { + type Output = (PeerId, QuicMuxer); + type Error = Error; + type Listener = Pin< + Box, Self::Error>> + Send>, + >; + type ListenerUpgrade = Upgrade; + type Dial = Pin> + Send>>; + + fn listen_on(self, addr: Multiaddr) -> Result> { + // TODO: check address correctness + + // TODO: report the locally opened addresses + + Ok(stream::unfold((), move |()| { + let endpoint = self.0.clone(); + let addr = addr.clone(); + async move { + let event = match endpoint.next_incoming().await { + Left(connec) => { + let remote_addr = socketaddr_to_multiaddr(&connec.remote_addr()); + Ok(ListenerEvent::Upgrade { + upgrade: Upgrade::from_connection(connec), + local_addr: addr.clone(), // TODO: hack + remote_addr, + }) + } + Right(multiaddr) => Ok(ListenerEvent::NewAddress(multiaddr)), + }; + Some((event, ())) + } + }) + .boxed()) + } + + fn dial(self, addr: Multiaddr) -> Result> { + let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + socket_addr + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + + Ok(async move { + let connection = self.0.dial(socket_addr).await.map_err(Error::Reach)?; + let final_connec = Upgrade::from_connection(connection).await?; + Ok(final_connec) + } + .boxed()) + } +} + +/// Tries to turn a QUIC multiaddress into a UDP [`SocketAddr`]. Returns an error if the format +/// of the multiaddr is wrong. +pub(crate) fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { + let mut iter = addr.iter(); + let proto1 = iter.next().ok_or(())?; + let proto2 = iter.next().ok_or(())?; + let proto3 = iter.next().ok_or(())?; + + if iter.next().is_some() { + return Err(()); + } + + match (proto1, proto2, proto3) { + (Protocol::Ip4(ip), Protocol::Udp(port), Protocol::Quic) => { + Ok(SocketAddr::new(ip.into(), port)) + } + (Protocol::Ip6(ip), Protocol::Udp(port), Protocol::Quic) => { + Ok(SocketAddr::new(ip.into(), port)) + } + _ => Err(()), + } +} + +/// Turns an IP address and port into the corresponding QUIC multiaddr. +pub(crate) fn socketaddr_to_multiaddr(socket_addr: &SocketAddr) -> Multiaddr { + Multiaddr::empty() + .with(socket_addr.ip().into()) + .with(Protocol::Udp(socket_addr.port())) + .with(Protocol::Quic) +} + +#[cfg(test)] +#[test] +fn multiaddr_to_udp_conversion() { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + + assert!( + multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()).is_err() + ); + + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/12345/quic" + .parse::() + .unwrap() + ), + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/255.255.255.255/udp/8080/quic" + .parse::() + .unwrap() + ), + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), + 8080, + )) + ); + assert_eq!( + multiaddr_to_socketaddr(&"/ip6/::1/udp/12345/quic".parse::().unwrap()), + Ok(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/udp/8080/quic" + .parse::() + .unwrap() + ), + Ok(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, + )), + 8080, + )) + ); +} diff --git a/transports/quic/src/upgrade.rs b/transports/quic/src/upgrade.rs new file mode 100644 index 00000000000..f40cf41ab83 --- /dev/null +++ b/transports/quic/src/upgrade.rs @@ -0,0 +1,88 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Future that drives a QUIC connection until is has performed its TLS handshake. + +use crate::{ + connection::{Connection, ConnectionEvent}, + muxer::QuicMuxer, + transport, x509, +}; + +use futures::prelude::*; +use libp2p_core::PeerId; +use std::{ + fmt, + pin::Pin, + task::{Context, Poll}, +}; + +/// A QUIC connection currently being negotiated. +pub struct Upgrade { + connection: Option, +} + +impl Upgrade { + /// Builds an [`Upgrade`] that wraps around a [`Connection`]. + pub(crate) fn from_connection(connection: Connection) -> Self { + Upgrade { + connection: Some(connection), + } + } +} + +impl Future for Upgrade { + type Output = Result<(PeerId, QuicMuxer), transport::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let connection = match self.connection.as_mut() { + Some(c) => c, + None => panic!("Future polled after it has ended"), + }; + + loop { + match Connection::poll_event(connection, cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(ConnectionEvent::Connected) => { + let mut certificates = connection.peer_certificates().unwrap(); + let peer_id = + x509::extract_peerid_or_panic(certificates.next().unwrap().as_der()); // TODO: bad API + let muxer = QuicMuxer::from_connection(self.connection.take().unwrap()); + self.connection = None; + return Poll::Ready(Ok((peer_id, muxer))); + } + Poll::Ready(ConnectionEvent::ConnectionLost(err)) => { + self.connection = None; + return Poll::Ready(Err(transport::Error::Established(err))); + } + Poll::Ready(ConnectionEvent::StreamOpened) + | Poll::Ready(ConnectionEvent::StreamReadable(_)) => continue, + // TODO: enumerate the items and explain how they can't happen + Poll::Ready(e) => unreachable!("{:?}", e), + } + } + } +} + +impl fmt::Debug for Upgrade { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.connection, f) + } +} diff --git a/transports/quic/src/x509.rs b/transports/quic/src/x509.rs new file mode 100644 index 00000000000..208da40706e --- /dev/null +++ b/transports/quic/src/x509.rs @@ -0,0 +1,89 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! TLS configuration for `libp2p-quic`. + +mod certificate; +mod verifier; + +use std::sync::Arc; +use thiserror::Error; + +pub use verifier::extract_peerid_or_panic; + +const LIBP2P_SIGNING_PREFIX: [u8; 21] = *b"libp2p-tls-handshake:"; +const LIBP2P_SIGNING_PREFIX_LENGTH: usize = LIBP2P_SIGNING_PREFIX.len(); +const LIBP2P_OID_BYTES: &[u8] = &[43, 6, 1, 4, 1, 131, 162, 90, 1, 1]; + +/// Error creating a configuration +// TODO: remove this; what is the user supposed to do with this error? +#[derive(Debug, Error)] +pub enum ConfigError { + /// TLS private key or certificate rejected + #[error("TLS private or certificate key rejected: {0}")] + TLSError(#[from] rustls::TLSError), + /// Signing failed + #[error("Signing failed: {0}")] + SigningError(#[from] libp2p_core::identity::error::SigningError), + /// Certificate generation error + #[error("Certificate generation error: {0}")] + RcgenError(#[from] rcgen::RcgenError), +} + +fn make_client_config( + certificate: rustls::Certificate, + key: rustls::PrivateKey, + verifier: Arc, +) -> Result { + let mut crypto = rustls::ClientConfig::new(); + crypto.versions = vec![rustls::ProtocolVersion::TLSv1_3]; + crypto.alpn_protocols = vec![b"libp2p".to_vec()]; + crypto.enable_early_data = false; + crypto.set_single_client_cert(vec![certificate], key)?; + crypto.dangerous().set_certificate_verifier(verifier); + Ok(crypto) +} + +fn make_server_config( + certificate: rustls::Certificate, + key: rustls::PrivateKey, + verifier: Arc, +) -> Result { + let mut crypto = rustls::ServerConfig::new(verifier); + crypto.versions = vec![rustls::ProtocolVersion::TLSv1_3]; + crypto.alpn_protocols = vec![b"libp2p".to_vec()]; + crypto.set_single_cert(vec![certificate], key)?; + Ok(crypto) +} + +/// Create TLS client and server configurations for libp2p. +pub fn make_tls_config( + keypair: &libp2p_core::identity::Keypair, +) -> Result<(rustls::ClientConfig, rustls::ServerConfig), ConfigError> { + let cert = certificate::make_cert(&keypair)?; + let private_key = cert.serialize_private_key_der(); + let verifier = Arc::new(verifier::Libp2pCertificateVerifier); + let cert = rustls::Certificate(cert.serialize_der()?); + let key = rustls::PrivateKey(private_key); + Ok(( + make_client_config(cert.clone(), key.clone(), verifier.clone())?, + make_server_config(cert, key, verifier)?, + )) +} diff --git a/transports/quic/src/x509/certificate.rs b/transports/quic/src/x509/certificate.rs new file mode 100644 index 00000000000..ba26291832d --- /dev/null +++ b/transports/quic/src/x509/certificate.rs @@ -0,0 +1,98 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Certificate handling for libp2p +//! +//! This handles generation, signing, and verification. +//! +//! This crate uses the `log` crate to emit log output. Events that will occur +//! normally are output at `trace` level, while “expected” error conditions +//! (ones that can result during correct use of the library) are logged at +//! `debug` level. + +use super::LIBP2P_SIGNING_PREFIX_LENGTH; +use libp2p_core::identity; + +const LIBP2P_OID: &[u64] = &[1, 3, 6, 1, 4, 1, 53594, 1, 1]; +const LIBP2P_SIGNATURE_ALGORITHM_PUBLIC_KEY_LENGTH: usize = 65; +static LIBP2P_SIGNATURE_ALGORITHM: &rcgen::SignatureAlgorithm = &rcgen::PKCS_ECDSA_P256_SHA256; +// preferred, but not supported by rustls yet +//const LIBP2P_SIGNATURE_ALGORITHM_PUBLIC_KEY_LENGTH: usize = 32; +//static LIBP2P_SIGNATURE_ALGORITHM: &rcgen::SignatureAlgorithm = +// &rcgen::PKCS_ED25519 + +/// Generates a self-signed TLS certificate that includes a libp2p-specific +/// certificate extension containing the public key of the given keypair. +pub(crate) fn make_cert( + keypair: &identity::Keypair, +) -> Result { + // Keypair used to sign the certificate. + let certif_keypair = rcgen::KeyPair::generate(&LIBP2P_SIGNATURE_ALGORITHM)?; + + // The libp2p-specific extension to the certificate contains a signature of the public key + // of the certificate using the libp2p private key. + let libp2p_ext_signature = { + let certif_pubkey = certif_keypair.public_key_raw(); + assert_eq!( + certif_pubkey.len(), + LIBP2P_SIGNATURE_ALGORITHM_PUBLIC_KEY_LENGTH, + "ed25519 public keys are {} bytes", + LIBP2P_SIGNATURE_ALGORITHM_PUBLIC_KEY_LENGTH + ); + + let mut buf = + [0u8; LIBP2P_SIGNING_PREFIX_LENGTH + LIBP2P_SIGNATURE_ALGORITHM_PUBLIC_KEY_LENGTH]; + buf[..LIBP2P_SIGNING_PREFIX_LENGTH].copy_from_slice(&super::LIBP2P_SIGNING_PREFIX[..]); + buf[LIBP2P_SIGNING_PREFIX_LENGTH..].copy_from_slice(certif_pubkey); + keypair.sign(&buf)? + }; + + // Generate the libp2p-specific extension. + let libp2p_extension: rcgen::CustomExtension = { + let extension_content = { + let serialized_pubkey = keypair.public().into_protobuf_encoding(); + yasna::construct_der(|writer| { + writer.write_sequence(|writer| { + writer + .next() + .write_bitvec_bytes(&serialized_pubkey, serialized_pubkey.len() * 8); + writer + .next() + .write_bitvec_bytes(&libp2p_ext_signature, libp2p_ext_signature.len() * 8); + }) + }) + }; + + let mut ext = rcgen::CustomExtension::from_oid_content(LIBP2P_OID, extension_content); + ext.set_criticality(true); + ext + }; + + let certificate = { + let mut params = rcgen::CertificateParams::new(vec![]); + params.distinguished_name = rcgen::DistinguishedName::new(); + params.custom_extensions.push(libp2p_extension); + params.alg = &LIBP2P_SIGNATURE_ALGORITHM; + params.key_pair = Some(certif_keypair); + rcgen::Certificate::from_params(params)? + }; + + Ok(certificate) +} diff --git a/transports/quic/src/x509/verifier.rs b/transports/quic/src/x509/verifier.rs new file mode 100644 index 00000000000..0dd911f9b62 --- /dev/null +++ b/transports/quic/src/x509/verifier.rs @@ -0,0 +1,234 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::identity::PublicKey; +use ring::io::der; +use rustls::{ + internal::msgs::handshake::DigitallySignedStruct, Certificate, ClientCertVerified, + HandshakeSignatureValid, ServerCertVerified, TLSError, +}; +use untrusted::{Input, Reader}; +use webpki::Error; + +/// Libp2p client and server certificate verifier. +pub(crate) struct Libp2pCertificateVerifier; + +/// libp2p requires the following of X.509 server certificate chains: +/// +/// * Exactly one certificate must be presented. +/// * The certificate must be self-signed. +/// * The certificate must have a valid libp2p extension that includes a +/// signature of its public key. +/// +/// The check that the [`PeerId`] matches the expected [`PeerId`] must be done by +/// the caller. +/// +/// [`PeerId`]: libp2p_core::PeerId +impl rustls::ServerCertVerifier for Libp2pCertificateVerifier { + fn verify_server_cert( + &self, + _roots: &rustls::RootCertStore, + presented_certs: &[rustls::Certificate], + _dns_name: webpki::DNSNameRef<'_>, + _ocsp_response: &[u8], + ) -> Result { + verify_presented_certs(presented_certs).map(|()| ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &Certificate, + _dss: &DigitallySignedStruct, + ) -> Result { + panic!("got asked to verify a TLS1.2 signature, but TLS1.2 was disabled") + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &Certificate, + dss: &DigitallySignedStruct, + ) -> Result { + verify_tls13_signature(message, cert, dss) + } +} + +/// libp2p requires the following of X.509 client certificate chains: +/// +/// * Exactly one certificate must be presented. In particular, client +/// authentication is mandatory in libp2p. +/// * The certificate must be self-signed. +/// * The certificate must have a valid libp2p extension that includes a +/// signature of its public key. +/// +/// The check that the [`PeerId`] matches the expected [`PeerId`] must be done by +/// the caller. +/// +/// [`PeerId`]: libp2p_core::PeerId +impl rustls::ClientCertVerifier for Libp2pCertificateVerifier { + fn offer_client_auth(&self) -> bool { + true + } + + fn client_auth_root_subjects( + &self, + _dns_name: Option<&webpki::DNSName>, + ) -> Option { + Some(vec![]) + } + + fn verify_client_cert( + &self, + presented_certs: &[Certificate], + _dns_name: Option<&webpki::DNSName>, + ) -> Result { + verify_presented_certs(presented_certs).map(|()| ClientCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &Certificate, + _dss: &DigitallySignedStruct, + ) -> Result { + panic!("got asked to verify a TLS1.2 signature, but TLS1.2 was disabled") + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &Certificate, + dss: &DigitallySignedStruct, + ) -> Result { + x509_signature::parse_certificate(cert.as_ref()) + .map_err(rustls::TLSError::WebPKIError)? + .check_tls13_signature(dss.scheme, message, dss.sig.0.as_ref()) + .map_err(rustls::TLSError::WebPKIError) + .map(|()| rustls::HandshakeSignatureValid::assertion()) + } +} + +fn verify_tls13_signature( + message: &[u8], + cert: &Certificate, + dss: &DigitallySignedStruct, +) -> Result { + x509_signature::parse_certificate(cert.as_ref()) + .map_err(rustls::TLSError::WebPKIError)? + .check_tls13_signature(dss.scheme, message, dss.sig.0.as_ref()) + .map_err(rustls::TLSError::WebPKIError) + .map(|()| rustls::HandshakeSignatureValid::assertion()) +} + +fn verify_libp2p_signature( + libp2p_extension: &Libp2pExtension<'_>, + x509_pkey_bytes: &[u8], +) -> Result<(), Error> { + let mut v = Vec::with_capacity(super::LIBP2P_SIGNING_PREFIX_LENGTH + x509_pkey_bytes.len()); + v.extend_from_slice(&super::LIBP2P_SIGNING_PREFIX[..]); + v.extend_from_slice(x509_pkey_bytes); + if libp2p_extension + .peer_key + .verify(&v, libp2p_extension.signature) + { + Ok(()) + } else { + Err(Error::UnknownIssuer) + } +} + +fn parse_certificate( + certificate: &[u8], +) -> Result<(x509_signature::X509Certificate<'_>, Libp2pExtension<'_>), Error> { + let parsed = x509_signature::parse_certificate(certificate)?; + let mut libp2p_extension = None; + + parsed + .extensions() + .iterate(&mut |oid, critical, extension| { + Ok(match oid { + super::LIBP2P_OID_BYTES if libp2p_extension.is_some() => return Err(Error::BadDER), + super::LIBP2P_OID_BYTES => { + libp2p_extension = Some(parse_libp2p_extension(extension)?) + } + _ if critical => return Err(Error::UnsupportedCriticalExtension), + _ => {} + }) + })?; + let libp2p_extension = libp2p_extension.ok_or(Error::UnknownIssuer)?; + Ok((parsed, libp2p_extension)) +} + +fn verify_presented_certs(presented_certs: &[Certificate]) -> Result<(), TLSError> { + if presented_certs.len() != 1 { + return Err(TLSError::NoCertificatesPresented); + } + let (certificate, extension) = + parse_certificate(presented_certs[0].as_ref()).map_err(TLSError::WebPKIError)?; + certificate.valid().map_err(TLSError::WebPKIError)?; + certificate + .check_self_issued() + .map_err(TLSError::WebPKIError)?; + verify_libp2p_signature(&extension, certificate.subject_public_key_info().key()) + .map_err(TLSError::WebPKIError) +} + +struct Libp2pExtension<'a> { + peer_key: PublicKey, + signature: &'a [u8], +} + +fn parse_libp2p_extension<'a>(extension: Input<'a>) -> Result, Error> { + fn read_bit_string<'a>(input: &mut Reader<'a>, e: Error) -> Result, Error> { + der::bit_string_with_no_unused_bits(input).map_err(|_| e) + } + + let e = Error::ExtensionValueInvalid; + Input::read_all(&extension, e, |input| { + der::nested(input, der::Tag::Sequence, e, |input| { + let public_key = read_bit_string(input, e)?.as_slice_less_safe(); + let signature = read_bit_string(input, e)?.as_slice_less_safe(); + // We deliberately discard the error information because this is + // either a broken peer or an attack. + let peer_key = PublicKey::from_protobuf_encoding(public_key).map_err(|_| e)?; + Ok(Libp2pExtension { + signature, + peer_key, + }) + }) + }) +} + +/// Extracts the [`PeerId`] from a certificate’s libp2p extension. It is erroneous +/// to call this unless the certificate is known to be a well-formed X.509 +/// certificate with a valid libp2p extension. The certificate verifiers in this +/// crate validate check this. +/// +/// # Panics +/// +/// Panics if called on an invalid certificate. +/// +/// [`PeerId`]: libp2p_core::PeerId +pub fn extract_peerid_or_panic(certificate: &[u8]) -> libp2p_core::PeerId { + let r = parse_certificate(certificate) + .expect("we already checked that the certificate was valid during the handshake; qed"); + libp2p_core::PeerId::from_public_key(r.1.peer_key) +} diff --git a/transports/quic/tests/tests.rs b/transports/quic/tests/tests.rs new file mode 100644 index 00000000000..496ad9b824c --- /dev/null +++ b/transports/quic/tests/tests.rs @@ -0,0 +1,342 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + muxing::StreamMuxer, + transport::ListenerEvent, + transport::Transport, +}; +use libp2p_quic::{Config, Endpoint, QuicMuxer, QuicTransport}; + +use std::{ + io::Result, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tracing::{debug, error, info, trace}; + +#[derive(Debug)] +struct QuicStream<'a> { + id: Option, + muxer: &'a QuicMuxer, + shutdown: bool, +} + +impl<'a> AsyncWrite for QuicStream<'a> { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + assert!(!self.shutdown, "written after close"); + let Self { muxer, id, .. } = self.get_mut(); + let _ = muxer.poll_inbound(cx).is_pending(); + muxer + .write_substream(cx, id.as_mut().unwrap(), buf) + .map_err(From::from) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.shutdown = true; + let Self { muxer, id, .. } = self.get_mut(); + let _ = muxer.poll_inbound(cx).is_pending(); + debug!("trying to close {:?}", id); + match muxer.shutdown_substream(cx, id.as_mut().unwrap()) { + Poll::Pending => return Poll::Pending, + Poll::Ready(e) => e?, + }; + debug!("closed {:?}", id); + Poll::Ready(Ok(())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +impl<'a> AsyncRead for QuicStream<'a> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let Self { id, muxer, .. } = self.get_mut(); + let _ = muxer.poll_inbound(cx).is_pending(); + muxer + .read_substream(cx, id.as_mut().unwrap(), buf) + .map_err(From::from) + } +} + +impl<'a> Drop for QuicStream<'a> { + fn drop(&mut self) { + match self.id.take() { + None => {} + Some(id) => self.muxer.destroy_substream(id), + } + } +} + +struct Outbound<'a>(&'a QuicMuxer); + +impl<'a> futures::Future for Outbound<'a> { + type Output = Result>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Outbound(conn) = &mut *self; + let _ = conn.poll_inbound(cx); + conn.poll_outbound(cx, &mut ()) + .map_ok(|id| QuicStream { + id: Some(id), + muxer: self.get_mut().0.clone(), + shutdown: false, + }) + .map_err(From::from) + } +} + +#[derive(Debug)] +struct Inbound<'a>(&'a QuicMuxer); +impl<'a> futures::Stream for Inbound<'a> { + type Item = QuicStream<'a>; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + debug!("polling for inbound connections"); + self.0.poll_inbound(cx).map(|id| { + Some(QuicStream { + id: Some(id.expect("bug")), + muxer: self.get_mut().0.clone(), + shutdown: false, + }) + }) + } +} + +fn init() { + use tracing_subscriber::{fmt::Subscriber, EnvFilter}; + let _ = Subscriber::builder() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); +} + +struct Closer(Arc); + +impl Future for Closer { + type Output = Result<()>; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let i = &mut self.get_mut().0; + i.close(cx).map_err(From::from) + } +} + +#[test] +fn wildcard_expansion() { + init(); + let addr: Multiaddr = "/ip4/0.0.0.0/udp/1234/quic".parse().unwrap(); + let keypair = libp2p_core::identity::Keypair::generate_ed25519(); + let listener = + QuicTransport(Endpoint::new(Config::new(&keypair, addr.clone()).unwrap()).unwrap()); + let mut incoming = listener.listen_on(addr).unwrap(); + // Process all initial `NewAddress` events and make sure they + // do not contain wildcard address or port. + futures::executor::block_on(async move { + while let Some(event) = incoming.next().await.map(|e| e.unwrap()) { + match event { + ListenerEvent::NewAddress(a) => { + let mut iter = a.iter(); + match iter.next().expect("ip address") { + Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), + Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), + other => panic!("Unexpected protocol: {}", other), + } + if let Protocol::Udp(port) = iter.next().expect("port") { + assert_ne!(0, port) + } else { + panic!("No UDP port in address: {}", a) + } + assert_eq!(iter.next(), Some(Protocol::Quic)); + assert_eq!(iter.next(), None); + } + ListenerEvent::Upgrade { .. } => panic!(), + ListenerEvent::AddressExpired { .. } => panic!(), + ListenerEvent::Error { .. } => panic!(), + } + break; + } + drop(incoming); + }); +} + +#[test] +fn communicating_between_dialer_and_listener() { + init(); + for i in 0..1000u32 { + trace!("running a test"); + do_test(i) + } +} + +fn do_test(_i: u32) { + let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); + let mut ready_tx = Some(ready_tx); + let keypair = libp2p_core::identity::Keypair::generate_ed25519(); + let keypair2 = keypair.clone(); + let addr: Multiaddr = "/ip4/127.0.0.1/udp/0/quic".parse().expect("bad address?"); + let quic_config = Config::new(&keypair2, addr.clone()).unwrap(); + let quic_endpoint = Endpoint::new(quic_config).unwrap(); + let mut listener = QuicTransport(quic_endpoint.clone()) + .listen_on(addr) + .unwrap(); + error!("running tests"); + let handle = async_std::task::spawn(async move { + let key = loop { + trace!("awaiting connection"); + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(listen_addr) => { + if let Some(channel) = ready_tx.take() { + channel.send(listen_addr).unwrap(); + } + } + ListenerEvent::Upgrade { upgrade, .. } => { + info!("got a connection upgrade!"); + let (id, muxer): (_, QuicMuxer) = upgrade.await.expect("upgrade failed"); + info!("got a new muxer!"); + let muxer = Arc::new(muxer); + let mut socket: QuicStream = + Inbound(&*muxer).next().await.expect("no incoming stream"); + let mut buf = [0u8; 3]; + debug!("reading data from accepted stream!"); + { + let mut count = 0; + while count < buf.len() { + count += socket.read(&mut buf[count..]).await.unwrap(); + } + } + assert_eq!(buf, [4, 5, 6]); + debug!("writing data!"); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + debug!("data written!"); + socket.close().await.unwrap(); + assert_eq!(socket.read(&mut buf).await.unwrap(), 0); + debug!("end of stream"); + drop(socket); + // Closer(muxer).await.unwrap(); + debug!("finished!"); + break id; + } + _ => unreachable!(), + } + }; + drop(listener); + drop(quic_endpoint); + key + }); + + let second_handle = async_std::task::spawn(async move { + let addr = ready_rx.await.unwrap(); + let quic_config = + Config::new(&keypair, "/ip4/127.0.0.1/udp/0/quic".parse().unwrap()).unwrap(); + let quic_endpoint = QuicTransport(Endpoint::new(quic_config).unwrap()); + // Obtain a future socket through dialing + error!("Dialing a Connection: {:?}", addr); + let (peer_id, connection) = quic_endpoint.dial(addr.clone()).unwrap().await.unwrap(); + let connection = Arc::new(connection); + trace!("Received a Connection: {:?}", connection); + let () = connection.open_outbound(); + let mut stream = Outbound(&*connection).await.expect("failed"); + + debug!("opened a stream: id {:?}", stream.id); + stream.write_all(&[4u8, 5, 6]).await.unwrap(); + stream.close().await.unwrap(); + let mut buf = [0u8; 3]; + debug!("reading data!"); + { + let mut count = 0; + while count < buf.len() { + let read = stream.read(&mut buf[count..]).await.unwrap(); + assert_ne!(read, 0usize, "premature end of file"); + count += read; + } + } + assert_eq!(buf, [1u8, 2, 3]); + debug!("data read ― checking for EOF"); + assert_eq!(stream.read(&mut buf).await.unwrap(), 0); + drop(stream); + debug!("have EOF"); + // Closer(connection).await.expect("closed successfully"); + debug!("awaiting handle"); + peer_id + }); + assert_eq!( + async_std::task::block_on(handle), + async_std::task::block_on(second_handle) + ); +} + +#[test] +fn replace_port_0_in_returned_multiaddr_ipv4() { + init(); + let keypair = libp2p_core::identity::Keypair::generate_ed25519(); + + let addr = "/ip4/127.0.0.1/udp/0/quic".parse::().unwrap(); + assert!(addr.to_string().ends_with("udp/0/quic")); + + let config = Config::new(&keypair, addr.clone()).unwrap(); + let quic = QuicTransport(Endpoint::new(config).expect("no error")); + + let new_addr = futures::executor::block_on_stream(quic.listen_on(addr).unwrap()) + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + if new_addr.to_string().contains("udp/0") { + panic!("failed to expand address ― got {}", new_addr); + } +} + +#[test] +fn replace_port_0_in_returned_multiaddr_ipv6() { + init(); + let keypair = libp2p_core::identity::Keypair::generate_ed25519(); + + let addr: Multiaddr = "/ip6/::1/udp/0/quic".parse().unwrap(); + assert!(addr.to_string().contains("udp/0/quic")); + let config = Config::new(&keypair, addr.clone()).unwrap(); + let quic = QuicTransport(Endpoint::new(config).expect("no error")); + + let new_addr = futures::executor::block_on_stream(quic.listen_on(addr).unwrap()) + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + assert!(!new_addr.to_string().contains("udp/0")); +} + +#[test] +fn larger_addr_denied() { + init(); + let keypair = libp2p_core::identity::Keypair::generate_ed25519(); + let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345" + .parse::() + .unwrap(); + let config = Config::new(&keypair, addr).unwrap(); + assert!(Endpoint::new(config).is_err()) +} diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 1e870f78c6a..f2ce66d7ee8 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -31,11 +31,10 @@ use futures::{future::{self, Ready}, prelude::*}; use futures_timer::Delay; -use get_if_addrs::{IfAddr, get_if_addrs}; -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; +use ipnet::IpNet; use libp2p_core::{ Transport, - multiaddr::{Protocol, Multiaddr}, + multiaddr::{Protocol, Multiaddr, host_addresses}, transport::{ListenerEvent, TransportError} }; use log::{debug, trace}; @@ -44,7 +43,6 @@ use std::{ collections::VecDeque, convert::TryFrom, io, - iter::{self, FromIterator}, net::{IpAddr, SocketAddr}, pin::Pin, task::{Context, Poll}, @@ -134,11 +132,11 @@ impl Transport for $tcp_config { // as reported by `get_if_addrs`. let addrs = if socket_addr.ip().is_unspecified() { - let addrs = host_addresses(port)?; + let addrs = host_addresses(&[Protocol::Tcp(port)])?; debug!("Listening on {:?}", addrs.iter().map(|(_, _, ma)| ma).collect::>()); Addresses::Many(addrs) } else { - let ma = ip_to_multiaddr(local_addr.ip(), port); + let ma = Multiaddr::from(local_addr.ip()).with(Protocol::Tcp(port)); debug!("Listening on {:?}", ma); Addresses::One(ma) }; @@ -257,7 +255,7 @@ impl $tcp_listen_stream { return (Ok(ListenerEvent::Error(err)), self); } } - ip_to_multiaddr(sock_addr.ip(), sock_addr.port()) + Multiaddr::from(sock_addr.ip()).with(Protocol::Tcp(sock_addr.port())) } Err(err) => { debug!("Failed to get local address of incoming socket: {:?}", err); @@ -265,7 +263,7 @@ impl $tcp_listen_stream { } }; - let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); + let remote_addr = Multiaddr::from(sock_addr.ip()).with(Protocol::Tcp(sock_addr.port())); match $apply_config(&self.config, &sock) { Ok(()) => { @@ -389,41 +387,6 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { } } -// Create a [`Multiaddr`] from the given IP address and port number. -fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { - let proto = match ip { - IpAddr::V4(ip) => Protocol::Ip4(ip), - IpAddr::V6(ip) => Protocol::Ip6(ip) - }; - let it = iter::once(proto).chain(iter::once(Protocol::Tcp(port))); - Multiaddr::from_iter(it) -} - -// Collect all local host addresses and use the provided port number as listen port. -fn host_addresses(port: u16) -> io::Result> { - let mut addrs = Vec::new(); - for iface in get_if_addrs()? { - let ip = iface.ip(); - let ma = ip_to_multiaddr(ip, port); - let ipn = match iface.addr { - IfAddr::V4(ip4) => { - let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); - let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) - .expect("prefix_len is the number of bits in a u32, so can not exceed 32"); - IpNet::V4(ipnet) - } - IfAddr::V6(ip6) => { - let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); - let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) - .expect("prefix_len is the number of bits in a u128, so can not exceed 128"); - IpNet::V6(ipnet) - } - }; - addrs.push((ip, ipn, ma)) - } - Ok(addrs) -} - /// Listen address information. #[derive(Debug)] enum Addresses { @@ -459,7 +422,7 @@ fn check_for_interface_changes( // and expired addresses. // // TODO: We do not detect expired addresses unless there is a new address. - let old_listen_addrs = std::mem::replace(listen_addrs, host_addresses(listen_port)?); + let old_listen_addrs = std::mem::replace(listen_addrs, host_addresses(&[Protocol::Tcp(listen_port)])?); // Check for addresses no longer in use. for (ip, _, ma) in old_listen_addrs.iter() { diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 4d2f9a7ac07..2fe8f0c7bdf 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -309,7 +309,7 @@ where let stream = self.tls_config.client.connect(&dns_name, stream) .map_err(|e| { debug!("TLS handshake with {} failed: {}", address, e); - Error::Tls(tls::Error::from(e)) + Error::Tls(e.into()) }) .await?;