From c03132d75e80b569c2fd8447539ac0b7bddc8ecc Mon Sep 17 00:00:00 2001 From: Andrew Schran Date: Wed, 13 Nov 2024 16:51:15 -0500 Subject: [PATCH] Add support for TLS connections with self-signed cert on validator gRPC interface (#20238) Client side use of TLS will be enabled-by-default in a future PR. This has corrected TLS upgrade behavior compared to older PR #19796. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [x] Nodes (Validators and Full nodes): Adds support for TLS on validator gRPC interface. - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 17 +- Cargo.toml | 1 + consensus/core/src/network/tonic_network.rs | 28 +- consensus/core/src/network/tonic_tls.rs | 58 +-- crates/mysten-network/Cargo.toml | 3 + crates/mysten-network/src/client.rs | 87 +++- crates/mysten-network/src/config.rs | 17 +- crates/mysten-network/src/server.rs | 250 +++++++++--- crates/sui-core/Cargo.toml | 1 + crates/sui-core/src/authority_client.rs | 37 +- crates/sui-core/src/authority_server.rs | 9 +- .../src/unit_tests/authority_tests.rs | 15 +- .../src/unit_tests/consensus_tests.rs | 49 --- .../sui-core/src/unit_tests/server_tests.rs | 17 +- .../src/unit_tests/transaction_tests.rs | 108 +++-- crates/sui-node/src/lib.rs | 8 +- crates/sui-swarm/Cargo.toml | 1 + crates/sui-swarm/src/memory/node.rs | 8 +- crates/sui-tls/src/lib.rs | 43 +- crates/sui-tls/src/verifier.rs | 20 +- crates/sui-tool/Cargo.toml | 1 + crates/sui-tool/src/lib.rs | 8 +- crates/sui-types/src/committee.rs | 11 +- .../epoch_start_sui_system_state.rs | 1 + .../simtest_sui_system_state_inner.rs | 3 + .../sui_system_state_inner_v1.rs | 1 + .../sui_system_state_inner_v2.rs | 1 + .../sui_system_state_summary.rs | 5 + .../tests/consensus_integration_tests.rs | 238 ----------- .../primary/tests/causal_completion_tests.rs | 161 -------- .../tests/nodes_bootstrapping_tests.rs | 299 -------------- narwhal/test-utils/src/cluster.rs | 1 + narwhal/worker/src/lib.rs | 1 - narwhal/worker/src/tests/worker_tests.rs | 371 ------------------ narwhal/worker/src/transactions_server.rs | 206 ---------- narwhal/worker/src/worker.rs | 22 +- 36 files changed, 569 insertions(+), 1538 deletions(-) delete mode 100644 narwhal/executor/tests/consensus_integration_tests.rs delete mode 100644 narwhal/primary/tests/causal_completion_tests.rs delete mode 100644 narwhal/primary/tests/nodes_bootstrapping_tests.rs delete mode 100644 narwhal/worker/src/tests/worker_tests.rs delete mode 100644 narwhal/worker/src/transactions_server.rs diff --git a/Cargo.lock b/Cargo.lock index f6aacea48171a..ab6e641cb5538 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -967,23 +967,24 @@ dependencies = [ [[package]] name = "async-stream" -version = "0.3.3" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" dependencies = [ "async-stream-impl", "futures-core", + "pin-project-lite", ] [[package]] name = "async-stream-impl" -version = "0.3.3" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2 1.0.87", "quote 1.0.35", - "syn 1.0.107", + "syn 2.0.79", ] [[package]] @@ -8200,11 +8201,13 @@ name = "mysten-network" version = "0.2.0" dependencies = [ "anemo", + "async-stream", "bcs", "bytes", "eyre", "futures", "http 1.1.0", + "hyper-rustls 0.27.2", "hyper-util", "multiaddr", "once_cell", @@ -8212,6 +8215,7 @@ dependencies = [ "serde", "snap", "tokio", + "tokio-rustls 0.26.0", "tokio-stream", "tonic 0.12.3", "tonic-health", @@ -13349,6 +13353,7 @@ dependencies = [ "sui-storage", "sui-swarm-config", "sui-test-transaction-builder", + "sui-tls", "sui-transaction-checks", "sui-types", "tap", @@ -15295,6 +15300,7 @@ dependencies = [ "sui-protocol-config", "sui-simulator", "sui-swarm-config", + "sui-tls", "sui-types", "tap", "telemetry-subscribers", @@ -15432,6 +15438,7 @@ dependencies = [ "sui-sdk", "sui-snapshot", "sui-storage", + "sui-tls", "sui-types", "telemetry-subscribers", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 4a2f8089a5a70..9a0402b6f772c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -262,6 +262,7 @@ async-graphql = "=7.0.1" async-graphql-axum = "=7.0.1" async-graphql-value = "=7.0.1" async-recursion = "1.0.4" +async-stream = "0.3.6" async-trait = "0.1.61" atomic_float = "0.1" aws-config = "0.56" diff --git a/consensus/core/src/network/tonic_network.rs b/consensus/core/src/network/tonic_network.rs index 1c07f013cecc4..59a29a102ff90 100644 --- a/consensus/core/src/network/tonic_network.rs +++ b/consensus/core/src/network/tonic_network.rs @@ -24,6 +24,7 @@ use mysten_network::{ Multiaddr, }; use parking_lot::RwLock; +use sui_tls::AllowPublicKeys; use tokio::{ pin, task::JoinSet, @@ -44,7 +45,6 @@ use super::{ consensus_service_client::ConsensusServiceClient, consensus_service_server::ConsensusService, }, - tonic_tls::create_rustls_client_config, BlockStream, NetworkClient, NetworkManager, NetworkService, }; use crate::{ @@ -54,7 +54,7 @@ use crate::{ error::{ConsensusError, ConsensusResult}, network::{ tonic_gen::consensus_service_server::ConsensusServiceServer, - tonic_tls::create_rustls_server_config, + tonic_tls::certificate_server_name, }, CommitIndex, Round, }; @@ -381,7 +381,16 @@ impl ChannelPool { let address = format!("https://{address}"); let config = &self.context.parameters.tonic; let buffer_size = config.connection_buffer_size; - let client_tls_config = create_rustls_client_config(&self.context, network_keypair, peer); + let client_tls_config = sui_tls::create_rustls_client_config( + self.context + .committee + .authority(peer) + .network_key + .clone() + .into_inner(), + certificate_server_name(&self.context), + Some(network_keypair.private_key().into_inner()), + ); let endpoint = tonic_rustls::Channel::from_shared(address.clone()) .unwrap() .connect_timeout(timeout) @@ -728,8 +737,17 @@ impl NetworkManager for TonicManager { Arc::new(builder) }; - let tls_server_config = - create_rustls_server_config(&self.context, self.network_keypair.clone()); + let tls_server_config = sui_tls::create_rustls_server_config( + self.network_keypair.clone().private_key().into_inner(), + certificate_server_name(&self.context), + AllowPublicKeys::new( + self.context + .committee + .authorities() + .map(|(_i, a)| a.network_key.clone().into_inner()) + .collect(), + ), + ); let tls_acceptor = TlsAcceptor::from(Arc::new(tls_server_config)); // Create listener to incoming connections. diff --git a/consensus/core/src/network/tonic_tls.rs b/consensus/core/src/network/tonic_tls.rs index 13377934e3b18..6e7ff630115ec 100644 --- a/consensus/core/src/network/tonic_tls.rs +++ b/consensus/core/src/network/tonic_tls.rs @@ -2,63 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::context::Context; -use consensus_config::{AuthorityIndex, NetworkKeyPair}; -use sui_tls::AllowPublicKeys; -use tokio_rustls::rustls::{ClientConfig, ServerConfig}; -pub(crate) fn create_rustls_server_config( - context: &Context, - network_keypair: NetworkKeyPair, -) -> ServerConfig { - let allower = AllowPublicKeys::new( - context - .committee - .authorities() - .map(|(_i, a)| a.network_key.clone().into_inner()) - .collect(), - ); - let verifier = sui_tls::ClientCertVerifier::new(allower, certificate_server_name(context)); - // TODO: refactor to use key bytes - let self_signed_cert = sui_tls::SelfSignedCertificate::new( - network_keypair.private_key().into_inner(), - &certificate_server_name(context), - ); - let tls_cert = self_signed_cert.rustls_certificate(); - let tls_private_key = self_signed_cert.rustls_private_key(); - let mut tls_config = verifier - .rustls_server_config(vec![tls_cert], tls_private_key) - .unwrap_or_else(|e| panic!("Failed to create TLS server config: {:?}", e)); - tls_config.alpn_protocols = vec![b"h2".to_vec()]; - tls_config -} - -pub(crate) fn create_rustls_client_config( - context: &Context, - network_keypair: NetworkKeyPair, - target: AuthorityIndex, -) -> ClientConfig { - let target_public_key = context - .committee - .authority(target) - .network_key - .clone() - .into_inner(); - let self_signed_cert = sui_tls::SelfSignedCertificate::new( - network_keypair.private_key().into_inner(), - &certificate_server_name(context), - ); - let tls_cert = self_signed_cert.rustls_certificate(); - let tls_private_key = self_signed_cert.rustls_private_key(); - let mut tls_config = - sui_tls::ServerCertVerifier::new(target_public_key, certificate_server_name(context)) - .rustls_client_config(vec![tls_cert], tls_private_key) - .unwrap_or_else(|e| panic!("Failed to create TLS client config: {:?}", e)); - // ServerCertVerifier sets alpn for completeness, but alpn cannot be predefined when - // using HttpsConnector from hyper-rustls, as in TonicManager. - tls_config.alpn_protocols = vec![]; - tls_config -} - -fn certificate_server_name(context: &Context) -> String { +pub(crate) fn certificate_server_name(context: &Context) -> String { format!("consensus_epoch_{}", context.committee.epoch()) } diff --git a/crates/mysten-network/Cargo.toml b/crates/mysten-network/Cargo.toml index 3fb61694e170f..18426cb914806 100644 --- a/crates/mysten-network/Cargo.toml +++ b/crates/mysten-network/Cargo.toml @@ -9,6 +9,7 @@ publish = false [dependencies] anemo.workspace = true +async-stream.workspace = true bcs.workspace = true bytes.workspace = true eyre.workspace = true @@ -18,8 +19,10 @@ multiaddr.workspace = true serde.workspace = true once_cell.workspace = true snap.workspace = true +hyper-rustls.workspace = true hyper-util.workspace = true tokio = { workspace = true, features = ["sync", "rt", "macros"] } +tokio-rustls.workspace = true tokio-stream.workspace = true tonic.workspace = true tonic-health.workspace = true diff --git a/crates/mysten-network/src/client.rs b/crates/mysten-network/src/client.rs index f0c188f54f21c..8cb508c798431 100644 --- a/crates/mysten-network/src/client.rs +++ b/crates/mysten-network/src/client.rs @@ -21,53 +21,67 @@ use std::{ vec, }; use tokio::task::JoinHandle; +use tokio_rustls::rustls::ClientConfig; use tonic::transport::{Channel, Endpoint, Uri}; use tower::Service; use tracing::{info, trace}; -pub async fn connect(address: &Multiaddr) -> Result { - let channel = endpoint_from_multiaddr(address)?.connect().await?; +pub async fn connect(address: &Multiaddr, tls_config: Option) -> Result { + let channel = endpoint_from_multiaddr(address, tls_config)? + .connect() + .await?; Ok(channel) } -pub fn connect_lazy(address: &Multiaddr) -> Result { - let channel = endpoint_from_multiaddr(address)?.connect_lazy(); +pub fn connect_lazy(address: &Multiaddr, tls_config: Option) -> Result { + let channel = endpoint_from_multiaddr(address, tls_config)?.connect_lazy(); Ok(channel) } -pub(crate) async fn connect_with_config(address: &Multiaddr, config: &Config) -> Result { - let channel = endpoint_from_multiaddr(address)? +pub(crate) async fn connect_with_config( + address: &Multiaddr, + tls_config: Option, + config: &Config, +) -> Result { + let channel = endpoint_from_multiaddr(address, tls_config)? .apply_config(config) .connect() .await?; Ok(channel) } -pub(crate) fn connect_lazy_with_config(address: &Multiaddr, config: &Config) -> Result { - let channel = endpoint_from_multiaddr(address)? +pub(crate) fn connect_lazy_with_config( + address: &Multiaddr, + tls_config: Option, + config: &Config, +) -> Result { + let channel = endpoint_from_multiaddr(address, tls_config)? .apply_config(config) .connect_lazy(); Ok(channel) } -fn endpoint_from_multiaddr(addr: &Multiaddr) -> Result { +fn endpoint_from_multiaddr( + addr: &Multiaddr, + tls_config: Option, +) -> Result { let mut iter = addr.iter(); let channel = match iter.next().ok_or_else(|| eyre!("address is empty"))? { Protocol::Dns(_) => { let (dns_name, tcp_port, http_or_https) = parse_dns(addr)?; let uri = format!("{http_or_https}://{dns_name}:{tcp_port}"); - MyEndpoint::try_from_uri(uri)? + MyEndpoint::try_from_uri(uri, tls_config)? } Protocol::Ip4(_) => { let (socket_addr, http_or_https) = parse_ip4(addr)?; let uri = format!("{http_or_https}://{socket_addr}"); - MyEndpoint::try_from_uri(uri)? + MyEndpoint::try_from_uri(uri, tls_config)? } Protocol::Ip6(_) => { let (socket_addr, http_or_https) = parse_ip6(addr)?; let uri = format!("{http_or_https}://{socket_addr}"); - MyEndpoint::try_from_uri(uri)? + MyEndpoint::try_from_uri(uri, tls_config)? } unsupported => return Err(eyre!("unsupported protocol {unsupported}")), }; @@ -77,21 +91,25 @@ fn endpoint_from_multiaddr(addr: &Multiaddr) -> Result { struct MyEndpoint { endpoint: Endpoint, + tls_config: Option, } static DISABLE_CACHING_RESOLVER: OnceCell = OnceCell::new(); impl MyEndpoint { - fn new(endpoint: Endpoint) -> Self { - Self { endpoint } + fn new(endpoint: Endpoint, tls_config: Option) -> Self { + Self { + endpoint, + tls_config, + } } - fn try_from_uri(uri: String) -> Result { + fn try_from_uri(uri: String, tls_config: Option) -> Result { let uri: Uri = uri .parse() .with_context(|| format!("unable to create Uri from '{uri}'"))?; let endpoint = Endpoint::from(uri); - Ok(Self::new(endpoint)) + Ok(Self::new(endpoint, tls_config)) } fn apply_config(mut self, config: &Config) -> Self { @@ -107,7 +125,17 @@ impl MyEndpoint { }); if disable_caching_resolver { - self.endpoint.connect_lazy() + if let Some(tls_config) = self.tls_config { + self.endpoint.connect_with_connector_lazy( + hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(tls_config) + .https_only() + .enable_http2() + .build(), + ) + } else { + self.endpoint.connect_lazy() + } } else { let mut http = HttpConnector::new_with_resolver(CachingResolver::new()); http.enforce_http(false); @@ -115,12 +143,33 @@ impl MyEndpoint { http.set_keepalive(None); http.set_connect_timeout(None); - self.endpoint.connect_with_connector_lazy(http) + if let Some(tls_config) = self.tls_config { + let https = hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(tls_config) + .https_only() + .enable_http1() + .wrap_connector(http); + self.endpoint.connect_with_connector_lazy(https) + } else { + self.endpoint.connect_with_connector_lazy(http) + } } } async fn connect(self) -> Result { - self.endpoint.connect().await.map_err(Into::into) + if let Some(tls_config) = self.tls_config { + let https_connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(tls_config) + .https_only() + .enable_http2() + .build(); + self.endpoint + .connect_with_connector(https_connector) + .await + .map_err(Into::into) + } else { + self.endpoint.connect().await.map_err(Into::into) + } } } diff --git a/crates/mysten-network/src/config.rs b/crates/mysten-network/src/config.rs index 1e59dbe75bcf8..eab88a024ec41 100644 --- a/crates/mysten-network/src/config.rs +++ b/crates/mysten-network/src/config.rs @@ -9,6 +9,7 @@ use crate::{ use eyre::Result; use serde::{Deserialize, Serialize}; use std::time::Duration; +use tokio_rustls::rustls::ClientConfig; use tonic::transport::Channel; #[derive(Debug, Default, Deserialize, Serialize)] @@ -90,11 +91,19 @@ impl Config { ServerBuilder::from_config(self, metrics_provider) } - pub async fn connect(&self, addr: &Multiaddr) -> Result { - connect_with_config(addr, self).await + pub async fn connect( + &self, + addr: &Multiaddr, + tls_config: Option, + ) -> Result { + connect_with_config(addr, tls_config, self).await } - pub fn connect_lazy(&self, addr: &Multiaddr) -> Result { - connect_lazy_with_config(addr, self) + pub fn connect_lazy( + &self, + addr: &Multiaddr, + tls_config: Option, + ) -> Result { + connect_lazy_with_config(addr, tls_config, self) } } diff --git a/crates/mysten-network/src/server.rs b/crates/mysten-network/src/server.rs index 4bac6fe61ae52..d60a3b4637ebf 100644 --- a/crates/mysten-network/src/server.rs +++ b/crates/mysten-network/src/server.rs @@ -9,11 +9,16 @@ use crate::{ multiaddr::{parse_dns, parse_ip4, parse_ip6, Multiaddr, Protocol}, }; use eyre::{eyre, Result}; -use futures::FutureExt; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt}; +use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use std::{convert::Infallible, net::SocketAddr}; -use tokio::net::{TcpListener, ToSocketAddrs}; -use tokio_stream::wrappers::TcpListenerStream; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; +use tokio_rustls::rustls::ServerConfig; +use tokio_rustls::{server::TlsStream, TlsAcceptor}; use tonic::codegen::http::HeaderValue; use tonic::{ body::BoxBody, @@ -35,6 +40,7 @@ use tower_http::classify::{GrpcErrorsAsFailures, SharedClassifier}; use tower_http::propagate_header::PropagateHeaderLayer; use tower_http::set_header::SetRequestHeaderLayer; use tower_http::trace::{DefaultMakeSpan, DefaultOnBodyChunk, DefaultOnEos, TraceLayer}; +use tracing::debug; pub struct ServerBuilder { router: Router>, @@ -155,46 +161,48 @@ impl ServerBuilder { self } - pub async fn bind(self, addr: &Multiaddr) -> Result { + pub async fn bind(self, addr: &Multiaddr, tls_config: Option) -> Result { let mut iter = addr.iter(); let (tx_cancellation, rx_cancellation) = tokio::sync::oneshot::channel(); let rx_cancellation = rx_cancellation.map(|_| ()); - let (local_addr, server): (Multiaddr, BoxFuture<(), tonic::transport::Error>) = - match iter.next().ok_or_else(|| eyre!("malformed addr"))? { - Protocol::Dns(_) => { - let (dns_name, tcp_port, _http_or_https) = parse_dns(addr)?; - let (local_addr, incoming) = - tcp_listener_and_update_multiaddr(addr, (dns_name.as_ref(), tcp_port)) - .await?; - let server = Box::pin( - self.router - .serve_with_incoming_shutdown(incoming, rx_cancellation), - ); - (local_addr, server) - } - Protocol::Ip4(_) => { - let (socket_addr, _http_or_https) = parse_ip4(addr)?; - let (local_addr, incoming) = - tcp_listener_and_update_multiaddr(addr, socket_addr).await?; - let server = Box::pin( - self.router - .serve_with_incoming_shutdown(incoming, rx_cancellation), - ); - (local_addr, server) - } - Protocol::Ip6(_) => { - let (socket_addr, _http_or_https) = parse_ip6(addr)?; - let (local_addr, incoming) = - tcp_listener_and_update_multiaddr(addr, socket_addr).await?; - let server = Box::pin( - self.router - .serve_with_incoming_shutdown(incoming, rx_cancellation), - ); - (local_addr, server) - } - unsupported => return Err(eyre!("unsupported protocol {unsupported}")), - }; + let (local_addr, server): (Multiaddr, BoxFuture<(), tonic::transport::Error>) = match iter + .next() + .ok_or_else(|| eyre!("malformed addr"))? + { + Protocol::Dns(_) => { + let (dns_name, tcp_port, _http_or_https) = parse_dns(addr)?; + let (local_addr, incoming) = + listen_and_update_multiaddr(addr, (dns_name.to_string(), tcp_port), tls_config) + .await?; + let server = Box::pin( + self.router + .serve_with_incoming_shutdown(incoming, rx_cancellation), + ); + (local_addr, server) + } + Protocol::Ip4(_) => { + let (socket_addr, _http_or_https) = parse_ip4(addr)?; + let (local_addr, incoming) = + listen_and_update_multiaddr(addr, socket_addr, tls_config).await?; + let server = Box::pin( + self.router + .serve_with_incoming_shutdown(incoming, rx_cancellation), + ); + (local_addr, server) + } + Protocol::Ip6(_) => { + let (socket_addr, _http_or_https) = parse_ip6(addr)?; + let (local_addr, incoming) = + listen_and_update_multiaddr(addr, socket_addr, tls_config).await?; + let server = Box::pin( + self.router + .serve_with_incoming_shutdown(incoming, rx_cancellation), + ); + (local_addr, server) + } + unsupported => return Err(eyre!("unsupported protocol {unsupported}")), + }; Ok(Server { server, @@ -205,22 +213,156 @@ impl ServerBuilder { } } -async fn tcp_listener_and_update_multiaddr( +async fn listen_and_update_multiaddr( address: &Multiaddr, socket_addr: T, -) -> Result<(Multiaddr, TcpListenerStream)> { - let (local_addr, incoming) = tcp_listener(socket_addr).await?; + tls_config: Option, +) -> Result<( + Multiaddr, + impl Stream>, +)> { + let listener = TcpListener::bind(socket_addr).await?; + let local_addr = listener.local_addr()?; let local_addr = update_tcp_port_in_multiaddr(address, local_addr.port()); - Ok((local_addr, incoming)) + + let tls_acceptor = tls_config.map(|tls_config| TlsAcceptor::from(Arc::new(tls_config))); + let incoming = TcpOrTlsListener::new(listener, tls_acceptor); + let stream = async_stream::stream! { + let mut new_connections = FuturesUnordered::new(); + loop { + tokio::select! { + result = incoming.accept_raw() => { + match result { + Ok((stream, addr)) => { + new_connections.push(incoming.maybe_upgrade(stream, addr)); + } + Err(e) => yield Err(e), + } + } + Some(result) = new_connections.next() => { + yield result; + } + } + } + }; + + Ok((local_addr, stream)) } -async fn tcp_listener(address: T) -> Result<(SocketAddr, TcpListenerStream)> { - let listener = TcpListener::bind(address).await?; - let local_addr = listener.local_addr()?; - let incoming = TcpListenerStream::new(listener); - Ok((local_addr, incoming)) +pub struct TcpOrTlsListener { + listener: TcpListener, + tls_acceptor: Option, } +impl TcpOrTlsListener { + fn new(listener: TcpListener, tls_acceptor: Option) -> Self { + Self { + listener, + tls_acceptor, + } + } + + async fn accept_raw(&self) -> std::io::Result<(TcpStream, SocketAddr)> { + self.listener.accept().await + } + + async fn maybe_upgrade( + &self, + stream: TcpStream, + addr: SocketAddr, + ) -> std::io::Result { + if self.tls_acceptor.is_none() { + return Ok(TcpOrTlsStream::Tcp(stream, addr)); + } + + // Determine whether new connection is TLS. + let mut buf = [0; 1]; + // `peek` blocks until at least some data is available, so if there is no error then + // it must return the one byte we are requesting. + stream.peek(&mut buf).await?; + if buf[0] == 0x16 { + // First byte of a TLS handshake is 0x16. + debug!("accepting TLS connection from {addr:?}"); + let stream = self.tls_acceptor.as_ref().unwrap().accept(stream).await?; + Ok(TcpOrTlsStream::Tls(stream, addr)) + } else { + debug!("accepting TCP connection from {addr:?}"); + Ok(TcpOrTlsStream::Tcp(stream, addr)) + } + } +} + +pub enum TcpOrTlsStream { + Tcp(TcpStream, SocketAddr), + Tls(TlsStream, SocketAddr), +} + +impl AsyncRead for TcpOrTlsStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf, + ) -> Poll> { + match self.get_mut() { + TcpOrTlsStream::Tcp(stream, _) => Pin::new(stream).poll_read(cx, buf), + TcpOrTlsStream::Tls(stream, _) => Pin::new(stream).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for TcpOrTlsStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.get_mut() { + TcpOrTlsStream::Tcp(stream, _) => Pin::new(stream).poll_write(cx, buf), + TcpOrTlsStream::Tls(stream, _) => Pin::new(stream).poll_write(cx, buf), + } + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.get_mut() { + TcpOrTlsStream::Tcp(stream, _) => Pin::new(stream).poll_flush(cx), + TcpOrTlsStream::Tls(stream, _) => Pin::new(stream).poll_flush(cx), + } + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.get_mut() { + TcpOrTlsStream::Tcp(stream, _) => Pin::new(stream).poll_shutdown(cx), + TcpOrTlsStream::Tls(stream, _) => Pin::new(stream).poll_shutdown(cx), + } + } +} + +impl tonic::transport::server::Connected for TcpOrTlsStream { + type ConnectInfo = tonic::transport::server::TcpConnectInfo; + + fn connect_info(&self) -> Self::ConnectInfo { + match self { + TcpOrTlsStream::Tcp(stream, addr) => Self::ConnectInfo { + local_addr: stream.local_addr().ok(), + remote_addr: Some(*addr), + }, + TcpOrTlsStream::Tls(stream, addr) => Self::ConnectInfo { + local_addr: stream.get_ref().0.local_addr().ok(), + remote_addr: Some(*addr), + }, + } + } +} + +/// TLS server name to use for the public Sui validator interface. +pub const SUI_TLS_SERVER_NAME: &str = "sui"; + pub struct Server { server: BoxFuture<(), tonic::transport::Error>, cancel_handle: Option>, @@ -318,14 +460,14 @@ mod test { let mut server = config .server_builder_with_metrics(metrics.clone()) - .bind(&address) + .bind(&address, None) .await .unwrap(); let address = server.local_addr().to_owned(); let cancel_handle = server.take_cancel_handle().unwrap(); let server_handle = tokio::spawn(server.serve()); - let channel = config.connect(&address).await.unwrap(); + let channel = config.connect(&address, None).await.unwrap(); let mut client = HealthClient::new(channel); client @@ -381,14 +523,14 @@ mod test { let mut server = config .server_builder_with_metrics(metrics.clone()) - .bind(&address) + .bind(&address, None) .await .unwrap(); let address = server.local_addr().to_owned(); let cancel_handle = server.take_cancel_handle().unwrap(); let server_handle = tokio::spawn(server.serve()); - let channel = config.connect(&address).await.unwrap(); + let channel = config.connect(&address, None).await.unwrap(); let mut client = HealthClient::new(channel); // Call the healthcheck for a service that doesn't exist @@ -408,11 +550,11 @@ mod test { async fn test_multiaddr(address: Multiaddr) { let config = Config::new(); - let mut server = config.server_builder().bind(&address).await.unwrap(); + let mut server = config.server_builder().bind(&address, None).await.unwrap(); let address = server.local_addr().to_owned(); let cancel_handle = server.take_cancel_handle().unwrap(); let server_handle = tokio::spawn(server.serve()); - let channel = config.connect(&address).await.unwrap(); + let channel = config.connect(&address, None).await.unwrap(); let mut client = HealthClient::new(channel); client diff --git a/crates/sui-core/Cargo.toml b/crates/sui-core/Cargo.toml index 2882e2e33cb6b..27c84857f6878 100644 --- a/crates/sui-core/Cargo.toml +++ b/crates/sui-core/Cargo.toml @@ -88,6 +88,7 @@ sui-protocol-config.workspace = true sui-transaction-checks.workspace = true sui-simulator.workspace = true sui-storage.workspace = true +sui-tls.workspace = true sui-types.workspace = true zeroize.workspace = true nonempty.workspace = true diff --git a/crates/sui-core/src/authority_client.rs b/crates/sui-core/src/authority_client.rs index 78461b0656476..9537dec300c71 100644 --- a/crates/sui-core/src/authority_client.rs +++ b/crates/sui-core/src/authority_client.rs @@ -11,6 +11,7 @@ use std::time::Duration; use sui_network::{api::ValidatorClient, tonic}; use sui_types::base_types::AuthorityName; use sui_types::committee::CommitteeWithNetworkMetadata; +use sui_types::crypto::NetworkPublicKey; use sui_types::messages_checkpoint::{ CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2, }; @@ -97,15 +98,32 @@ pub struct NetworkAuthorityClient { } impl NetworkAuthorityClient { - pub async fn connect(address: &Multiaddr) -> anyhow::Result { - let channel = mysten_network::client::connect(address) + pub async fn connect( + address: &Multiaddr, + tls_target: Option, + ) -> anyhow::Result { + let tls_config = tls_target.map(|tls_target| { + sui_tls::create_rustls_client_config( + tls_target, + sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(), + None, + ) + }); + let channel = mysten_network::client::connect(address, tls_config) .await .map_err(|err| anyhow!(err.to_string()))?; Ok(Self::new(channel)) } - pub fn connect_lazy(address: &Multiaddr) -> Self { - let client: SuiResult<_> = mysten_network::client::connect_lazy(address) + pub fn connect_lazy(address: &Multiaddr, tls_target: Option) -> Self { + let tls_config = tls_target.map(|tls_target| { + sui_tls::create_rustls_client_config( + tls_target, + sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(), + None, + ) + }); + let client: SuiResult<_> = mysten_network::client::connect_lazy(address, tls_config) .map(ValidatorClient::new) .map_err(|err| err.to_string().into()); Self { client } @@ -265,7 +283,16 @@ pub fn make_network_authority_clients_with_network_config( for (name, (_state, network_metadata)) in committee.validators() { let address = network_metadata.network_address.clone(); let address = address.rewrite_udp_to_tcp(); - let maybe_channel = network_config.connect_lazy(&address).map_err(|e| { + // TODO: Enable TLS on this interface with below config, once support is rolled out to validators. + // let tls_config = network_metadata.network_public_key.as_ref().map(|key| { + // sui_tls::create_rustls_client_config( + // key.clone(), + // sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(), + // None, + // ) + // }); + // TODO: Change below code to generate a SuiError if no valid TLS config is available. + let maybe_channel = network_config.connect_lazy(&address, None).map_err(|e| { tracing::error!( address = %address, name = %name, diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index ea093886942dc..4013a5c948ebb 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -4,7 +4,9 @@ use anyhow::Result; use async_trait::async_trait; +use fastcrypto::traits::KeyPair; use mysten_metrics::spawn_monitored_task; +use mysten_network::server::SUI_TLS_SERVER_NAME; use prometheus::{ register_histogram_with_registry, register_int_counter_vec_with_registry, register_int_counter_with_registry, Histogram, IntCounter, IntCounterVec, Registry, @@ -146,6 +148,11 @@ impl AuthorityServer { self, address: Multiaddr, ) -> Result { + let tls_config = sui_tls::create_rustls_server_config( + self.state.config.network_key_pair().copy().private(), + SUI_TLS_SERVER_NAME.to_string(), + sui_tls::AllowAll, + ); let mut server = mysten_network::config::Config::new() .server_builder() .add_service(ValidatorServer::new(ValidatorService::new_for_tests( @@ -153,7 +160,7 @@ impl AuthorityServer { self.consensus_adapter, self.metrics, ))) - .bind(&address) + .bind(&address, Some(tls_config)) .await .unwrap(); let local_addr = server.local_addr().to_owned(); diff --git a/crates/sui-core/src/unit_tests/authority_tests.rs b/crates/sui-core/src/unit_tests/authority_tests.rs index e351164130cd7..86651b7221293 100644 --- a/crates/sui-core/src/unit_tests/authority_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_tests.rs @@ -1198,9 +1198,18 @@ async fn test_handle_transfer_transaction_bad_signature() { let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some( + authority_state + .config + .network_key_pair() + .public() + .to_owned(), + ), + ) + .await + .unwrap(); let (_unknown_address, unknown_key): (_, AccountKeyPair) = get_key_pair(); let mut bad_signature_transfer_transaction = transfer_transaction.clone().into_inner(); diff --git a/crates/sui-core/src/unit_tests/consensus_tests.rs b/crates/sui-core/src/unit_tests/consensus_tests.rs index 70eb5791739af..d0d5b654a87e0 100644 --- a/crates/sui-core/src/unit_tests/consensus_tests.rs +++ b/crates/sui-core/src/unit_tests/consensus_tests.rs @@ -11,19 +11,14 @@ use crate::mock_consensus::with_block_status; use consensus_core::{BlockRef, BlockStatus}; use fastcrypto::traits::KeyPair; use move_core_types::{account_address::AccountAddress, ident_str}; -use narwhal_types::Transactions; -use narwhal_types::TransactionsServer; -use narwhal_types::{Empty, TransactionProto}; use parking_lot::Mutex; use rand::rngs::StdRng; use rand::SeedableRng; -use sui_network::tonic; use sui_types::crypto::{deterministic_random_account_key, AccountKeyPair}; use sui_types::gas::GasCostSummary; use sui_types::messages_checkpoint::{ CheckpointContents, CheckpointSignatureMessage, CheckpointSummary, SignedCheckpointSummary, }; -use sui_types::multiaddr::Multiaddr; use sui_types::utils::{make_committee_key, to_sender_signed_transaction}; use sui_types::SUI_FRAMEWORK_PACKAGE_ID; use sui_types::{ @@ -34,8 +29,6 @@ use sui_types::{ TEST_ONLY_GAS_UNIT_FOR_OBJECT_BASICS, }, }; -use tokio::sync::mpsc::channel; -use tokio::sync::mpsc::{Receiver, Sender}; /// Fixture: a few test gas objects. pub fn test_gas_objects() -> Vec { @@ -444,45 +437,3 @@ async fn submit_checkpoint_signature_to_consensus_adapter() { .unwrap(); waiter.await.unwrap(); } - -pub struct ConsensusMockServer { - sender: Sender, -} - -impl ConsensusMockServer { - pub fn spawn(address: Multiaddr) -> Receiver { - let (sender, receiver) = channel(1); - tokio::spawn(async move { - let config = mysten_network::config::Config::new(); - let mock = Self { sender }; - config - .server_builder() - .add_service(TransactionsServer::new(mock)) - .bind(&address) - .await - .unwrap() - .serve() - .await - }); - receiver - } -} - -#[tonic::async_trait] -impl Transactions for ConsensusMockServer { - /// Submit a Transactions - async fn submit_transaction( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - self.sender.send(request.into_inner()).await.unwrap(); - Ok(tonic::Response::new(Empty {})) - } - /// Submit a Transactions - async fn submit_transaction_stream( - &self, - _request: tonic::Request>, - ) -> Result, tonic::Status> { - unimplemented!() - } -} diff --git a/crates/sui-core/src/unit_tests/server_tests.rs b/crates/sui-core/src/unit_tests/server_tests.rs index 0a2e83627b26d..fce54971bc7f3 100644 --- a/crates/sui-core/src/unit_tests/server_tests.rs +++ b/crates/sui-core/src/unit_tests/server_tests.rs @@ -19,13 +19,22 @@ async fn test_simple_request() { let authority_state = init_state_with_object_id(sender, object_id).await; // The following two fields are only needed for shared objects (not by this bench). - let server = AuthorityServer::new_for_test(authority_state); + let server = AuthorityServer::new_for_test(authority_state.clone()); let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some( + authority_state + .config + .network_key_pair() + .public() + .to_owned(), + ), + ) + .await + .unwrap(); let req = ObjectInfoRequest::latest_object_info_request(object_id, LayoutGenerationOption::Generate); diff --git a/crates/sui-core/src/unit_tests/transaction_tests.rs b/crates/sui-core/src/unit_tests/transaction_tests.rs index 14b60bdbdbfdf..f8e4d739ddbfe 100644 --- a/crates/sui-core/src/unit_tests/transaction_tests.rs +++ b/crates/sui-core/src/unit_tests/transaction_tests.rs @@ -442,9 +442,18 @@ async fn do_transaction_test_impl( let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some( + authority_state + .config + .network_key_pair() + .public() + .to_owned(), + ), + ) + .await + .unwrap(); post_sign_mutations(&mut transfer_transaction); post_sign_mutations(&mut move_call_transaction); @@ -1035,9 +1044,18 @@ async fn setup_zklogin_network( let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some( + authority_state + .config + .network_key_pair() + .public() + .to_owned(), + ), + ) + .await + .unwrap(); ( object_ids, gas_object_ids, @@ -1328,9 +1346,18 @@ async fn execute_transaction_assert_err( let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some( + authority_state + .config + .network_key_pair() + .public() + .to_owned(), + ), + ) + .await + .unwrap(); let err = client .handle_transaction(txn.clone(), Some(make_socket_addr())) .await; @@ -1380,9 +1407,18 @@ async fn test_oversized_txn() { let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some( + authority_state + .config + .network_key_pair() + .public() + .to_owned(), + ), + ) + .await + .unwrap(); let res = client .handle_transaction(txn, Some(make_socket_addr())) @@ -1431,9 +1467,18 @@ async fn test_very_large_certificate() { let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some( + authority_state + .config + .network_key_pair() + .public() + .to_owned(), + ), + ) + .await + .unwrap(); let socket_addr = make_socket_addr(); let auth_sig = client @@ -1513,9 +1558,18 @@ async fn test_handle_certificate_errors() { let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some( + authority_state + .config + .network_key_pair() + .public() + .to_owned(), + ), + ) + .await + .unwrap(); // Test handle certificate from the wrong epoch let epoch_store = authority_state.epoch_store_for_testing(); @@ -1688,9 +1742,12 @@ async fn test_handle_soft_bundle_certificates() { let server = AuthorityServer::new_for_test_with_consensus_adapter(authority.clone(), adapter); let _metrics = server.metrics.clone(); let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some(authority.config.network_key_pair().public().to_owned()), + ) + .await + .unwrap(); let signed_tx_into_certificate = |transaction: Transaction| async { let epoch_store = authority.load_epoch_store_one_call_per_task(); @@ -1843,9 +1900,12 @@ async fn test_handle_soft_bundle_certificates_errors() { let authority = server.state.clone(); let _metrics = server.metrics.clone(); let server_handle = server.spawn_for_test().await.unwrap(); - let client = NetworkAuthorityClient::connect(server_handle.address()) - .await - .unwrap(); + let client = NetworkAuthorityClient::connect( + server_handle.address(), + Some(authority.config.network_key_pair().public().to_owned()), + ) + .await + .unwrap(); let signed_tx_into_certificate = |transaction: Transaction| async { let epoch_store = authority.load_epoch_store_one_call_per_task(); diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 3357d527cf226..cb7c342f396f7 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -13,6 +13,7 @@ use arc_swap::ArcSwap; use fastcrypto_zkp::bn254::zk_login::JwkId; use fastcrypto_zkp::bn254::zk_login::OIDCProvider; use futures::TryFutureExt; +use mysten_network::server::SUI_TLS_SERVER_NAME; use prometheus::Registry; use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt; @@ -1474,8 +1475,13 @@ impl SuiNode { server_builder = server_builder.add_service(ValidatorServer::new(validator_service)); + let tls_config = sui_tls::create_rustls_server_config( + config.network_key_pair().copy().private(), + SUI_TLS_SERVER_NAME.to_string(), + sui_tls::AllowAll, + ); let server = server_builder - .bind(config.network_address()) + .bind(config.network_address(), Some(tls_config)) .await .map_err(|err| anyhow!(err.to_string()))?; let local_addr = server.local_addr(); diff --git a/crates/sui-swarm/Cargo.toml b/crates/sui-swarm/Cargo.toml index 51caeadf3b797..6823246c0cd0a 100644 --- a/crates/sui-swarm/Cargo.toml +++ b/crates/sui-swarm/Cargo.toml @@ -25,6 +25,7 @@ sui-swarm-config.workspace = true sui-macros.workspace = true sui-node.workspace = true sui-protocol-config.workspace = true +sui-tls.workspace = true sui-types.workspace = true mysten-metrics.workspace = true mysten-network.workspace = true diff --git a/crates/sui-swarm/src/memory/node.rs b/crates/sui-swarm/src/memory/node.rs index 5cc10a6b1f7b5..541c2bc962850 100644 --- a/crates/sui-swarm/src/memory/node.rs +++ b/crates/sui-swarm/src/memory/node.rs @@ -9,6 +9,7 @@ use sui_config::NodeConfig; use sui_node::SuiNodeHandle; use sui_types::base_types::AuthorityName; use sui_types::base_types::ConciseableName; +use sui_types::crypto::KeypairTraits; use tap::TapFallible; use tracing::{error, info}; @@ -106,7 +107,12 @@ impl Node { if is_validator { let network_address = self.config().network_address().clone(); - let channel = mysten_network::client::connect(&network_address) + let tls_config = sui_tls::create_rustls_client_config( + self.config().network_key_pair().public().to_owned(), + sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(), + None, + ); + let channel = mysten_network::client::connect(&network_address, Some(tls_config)) .await .map_err(|err| anyhow!(err.to_string())) .map_err(HealthCheckError::Failure) diff --git a/crates/sui-tls/src/lib.rs b/crates/sui-tls/src/lib.rs index 7b3b9c23f5796..7f40317d43303 100644 --- a/crates/sui-tls/src/lib.rs +++ b/crates/sui-tls/src/lib.rs @@ -5,10 +5,9 @@ mod acceptor; mod certgen; mod verifier; -pub const SUI_VALIDATOR_SERVER_NAME: &str = "sui"; - pub use acceptor::{TlsAcceptor, TlsConnectionInfo}; pub use certgen::SelfSignedCertificate; +use rustls::ClientConfig; pub use verifier::{ public_key_from_certificate, AllowAll, AllowPublicKeys, Allower, ClientCertVerifier, ServerCertVerifier, @@ -16,6 +15,46 @@ pub use verifier::{ pub use rustls; +use fastcrypto::ed25519::{Ed25519PrivateKey, Ed25519PublicKey}; +use tokio_rustls::rustls::ServerConfig; + +pub const SUI_VALIDATOR_SERVER_NAME: &str = "sui"; + +pub fn create_rustls_server_config( + private_key: Ed25519PrivateKey, + server_name: String, + allower: A, +) -> ServerConfig { + let verifier = ClientCertVerifier::new(allower, server_name.clone()); + // TODO: refactor to use key bytes + let self_signed_cert = SelfSignedCertificate::new(private_key, server_name.as_str()); + let tls_cert = self_signed_cert.rustls_certificate(); + let tls_private_key = self_signed_cert.rustls_private_key(); + let mut tls_config = verifier + .rustls_server_config(vec![tls_cert], tls_private_key) + .unwrap_or_else(|e| panic!("Failed to create TLS server config: {:?}", e)); + tls_config.alpn_protocols = vec![b"h2".to_vec()]; + tls_config +} + +pub fn create_rustls_client_config( + target_public_key: Ed25519PublicKey, + server_name: String, + client_key: Option, // optional self-signed cert for client verification +) -> ClientConfig { + let tls_config = ServerCertVerifier::new(target_public_key, server_name.clone()); + let tls_config = if let Some(private_key) = client_key { + let self_signed_cert = SelfSignedCertificate::new(private_key, server_name.as_str()); + let tls_cert = self_signed_cert.rustls_certificate(); + let tls_private_key = self_signed_cert.rustls_private_key(); + tls_config.rustls_client_config_with_client_auth(vec![tls_cert], tls_private_key) + } else { + tls_config.rustls_client_config_with_no_client_auth() + } + .unwrap_or_else(|e| panic!("Failed to create TLS client config: {e:?}")); + tls_config +} + #[cfg(test)] mod tests { use std::collections::BTreeSet; diff --git a/crates/sui-tls/src/verifier.rs b/crates/sui-tls/src/verifier.rs index 562cc34d48973..b1e87fbf88823 100644 --- a/crates/sui-tls/src/verifier.rs +++ b/crates/sui-tls/src/verifier.rs @@ -178,20 +178,30 @@ impl ServerCertVerifier { Self { public_key, name } } - pub fn rustls_client_config( + pub fn rustls_client_config_with_client_auth( self, certificates: Vec>, private_key: PrivateKeyDer<'static>, ) -> Result { - let mut config = rustls::ClientConfig::builder_with_provider(Arc::new( + rustls::ClientConfig::builder_with_provider(Arc::new( rustls::crypto::ring::default_provider(), )) .with_safe_default_protocol_versions()? .dangerous() .with_custom_certificate_verifier(std::sync::Arc::new(self)) - .with_client_auth_cert(certificates, private_key)?; - config.alpn_protocols = vec![b"h2".to_vec()]; - Ok(config) + .with_client_auth_cert(certificates, private_key) + } + + pub fn rustls_client_config_with_no_client_auth( + self, + ) -> Result { + Ok(rustls::ClientConfig::builder_with_provider(Arc::new( + rustls::crypto::ring::default_provider(), + )) + .with_safe_default_protocol_versions()? + .dangerous() + .with_custom_certificate_verifier(std::sync::Arc::new(self)) + .with_no_client_auth()) } } diff --git a/crates/sui-tool/Cargo.toml b/crates/sui-tool/Cargo.toml index cb9bff954df3e..e2d0ce18338df 100644 --- a/crates/sui-tool/Cargo.toml +++ b/crates/sui-tool/Cargo.toml @@ -47,4 +47,5 @@ sui-storage.workspace = true sui-types.workspace = true sui-archival.workspace = true sui-package-dump.workspace = true +sui-tls.workspace = true bin-version.workspace = true diff --git a/crates/sui-tool/src/lib.rs b/crates/sui-tool/src/lib.rs index bfd26c5477475..f73e73a9eef5a 100644 --- a/crates/sui-tool/src/lib.rs +++ b/crates/sui-tool/src/lib.rs @@ -106,8 +106,14 @@ async fn make_clients( for validator in active_validators { let net_addr = Multiaddr::try_from(validator.net_address).unwrap(); + // TODO: Enable TLS on this interface with below config, once support is rolled out to validators. + // let tls_config = sui_tls::create_rustls_client_config( + // sui_types::crypto::NetworkPublicKey::from_bytes(&validator.network_pubkey_bytes)?, + // sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(), + // None, + // ); let channel = net_config - .connect_lazy(&net_addr) + .connect_lazy(&net_addr, None) .map_err(|err| anyhow!(err.to_string()))?; let client = NetworkAuthorityClient::new(channel); let public_key_bytes = diff --git a/crates/sui-types/src/committee.rs b/crates/sui-types/src/committee.rs index 906fd6ba1c94c..fea794ffd259c 100644 --- a/crates/sui-types/src/committee.rs +++ b/crates/sui-types/src/committee.rs @@ -3,7 +3,9 @@ // SPDX-License-Identifier: Apache-2.0 use super::base_types::*; -use crate::crypto::{random_committee_key_pairs_of_size, AuthorityKeyPair, AuthorityPublicKey}; +use crate::crypto::{ + random_committee_key_pairs_of_size, AuthorityKeyPair, AuthorityPublicKey, NetworkPublicKey, +}; use crate::error::{SuiError, SuiResult}; use crate::multiaddr::Multiaddr; use fastcrypto::traits::KeyPair; @@ -353,18 +355,17 @@ pub trait CommitteeTrait { fn weight(&self, author: &K) -> StakeUnit; } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug)] pub struct NetworkMetadata { pub network_address: Multiaddr, pub narwhal_primary_address: Multiaddr, + pub network_public_key: Option, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug)] pub struct CommitteeWithNetworkMetadata { epoch_id: EpochId, validators: BTreeMap, - - #[serde(skip)] committee: OnceCell, } diff --git a/crates/sui-types/src/sui_system_state/epoch_start_sui_system_state.rs b/crates/sui-types/src/sui_system_state/epoch_start_sui_system_state.rs index 9c7736ec470b9..118f15c89b420 100644 --- a/crates/sui-types/src/sui_system_state/epoch_start_sui_system_state.rs +++ b/crates/sui-types/src/sui_system_state/epoch_start_sui_system_state.rs @@ -159,6 +159,7 @@ impl EpochStartSystemStateTrait for EpochStartSystemStateV1 { NetworkMetadata { network_address: validator.sui_net_address.clone(), narwhal_primary_address: validator.narwhal_primary_address.clone(), + network_public_key: Some(validator.narwhal_network_pubkey.clone()), }, ), ) diff --git a/crates/sui-types/src/sui_system_state/simtest_sui_system_state_inner.rs b/crates/sui-types/src/sui_system_state/simtest_sui_system_state_inner.rs index 0a365fec9c972..380ce1708bda5 100644 --- a/crates/sui-types/src/sui_system_state/simtest_sui_system_state_inner.rs +++ b/crates/sui-types/src/sui_system_state/simtest_sui_system_state_inner.rs @@ -175,6 +175,7 @@ impl SuiSystemStateTrait for SimTestSuiSystemStateInnerV1 { NetworkMetadata { network_address: verified_metadata.net_address.clone(), narwhal_primary_address: verified_metadata.primary_address.clone(), + network_public_key: Some(verified_metadata.network_pubkey.clone()), }, ), ) @@ -291,6 +292,7 @@ impl SuiSystemStateTrait for SimTestSuiSystemStateInnerShallowV2 { NetworkMetadata { network_address: verified_metadata.net_address.clone(), narwhal_primary_address: verified_metadata.primary_address.clone(), + network_public_key: Some(verified_metadata.network_pubkey.clone()), }, ), ) @@ -436,6 +438,7 @@ impl SuiSystemStateTrait for SimTestSuiSystemStateInnerDeepV2 { NetworkMetadata { network_address: verified_metadata.net_address.clone(), narwhal_primary_address: verified_metadata.primary_address.clone(), + network_public_key: Some(verified_metadata.network_pubkey.clone()), }, ), ) diff --git a/crates/sui-types/src/sui_system_state/sui_system_state_inner_v1.rs b/crates/sui-types/src/sui_system_state/sui_system_state_inner_v1.rs index 11b8565b47e65..c1cd3056388b5 100644 --- a/crates/sui-types/src/sui_system_state/sui_system_state_inner_v1.rs +++ b/crates/sui-types/src/sui_system_state/sui_system_state_inner_v1.rs @@ -552,6 +552,7 @@ impl SuiSystemStateTrait for SuiSystemStateInnerV1 { NetworkMetadata { network_address: verified_metadata.net_address.clone(), narwhal_primary_address: verified_metadata.primary_address.clone(), + network_public_key: Some(verified_metadata.network_pubkey.clone()), }, ), ) diff --git a/crates/sui-types/src/sui_system_state/sui_system_state_inner_v2.rs b/crates/sui-types/src/sui_system_state/sui_system_state_inner_v2.rs index f0863c2119466..1b8ce5f75d6b9 100644 --- a/crates/sui-types/src/sui_system_state/sui_system_state_inner_v2.rs +++ b/crates/sui-types/src/sui_system_state/sui_system_state_inner_v2.rs @@ -132,6 +132,7 @@ impl SuiSystemStateTrait for SuiSystemStateInnerV2 { NetworkMetadata { network_address: verified_metadata.net_address.clone(), narwhal_primary_address: verified_metadata.primary_address.clone(), + network_public_key: Some(verified_metadata.network_pubkey.clone()), }, ), ) diff --git a/crates/sui-types/src/sui_system_state/sui_system_state_summary.rs b/crates/sui-types/src/sui_system_state/sui_system_state_summary.rs index 0b76e4c344b02..525650a730157 100644 --- a/crates/sui-types/src/sui_system_state/sui_system_state_summary.rs +++ b/crates/sui-types/src/sui_system_state/sui_system_state_summary.rs @@ -4,6 +4,7 @@ use super::{SuiSystemState, SuiSystemStateTrait}; use crate::base_types::{AuthorityName, ObjectID, SuiAddress}; use crate::committee::{CommitteeWithNetworkMetadata, NetworkMetadata}; +use crate::crypto::NetworkPublicKey; use crate::dynamic_field::get_dynamic_field_from_store; use crate::error::SuiError; use crate::id::ID; @@ -202,6 +203,10 @@ impl SuiSystemStateSummary { validator.primary_address.clone(), ) .unwrap(), + network_public_key: NetworkPublicKey::from_bytes( + &validator.network_pubkey_bytes, + ) + .ok(), }, ), ) diff --git a/narwhal/executor/tests/consensus_integration_tests.rs b/narwhal/executor/tests/consensus_integration_tests.rs deleted file mode 100644 index 9bebf74c0e3c7..0000000000000 --- a/narwhal/executor/tests/consensus_integration_tests.rs +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use bytes::Bytes; -use fastcrypto::hash::Hash; -use narwhal_executor::get_restored_consensus_output; -use narwhal_executor::MockExecutionState; -use primary::consensus::{ - Bullshark, Consensus, ConsensusMetrics, ConsensusRound, LeaderSchedule, LeaderSwapTable, -}; -use primary::NUM_SHUTDOWN_RECEIVERS; -use prometheus::Registry; -use std::collections::BTreeSet; -use std::sync::Arc; -use storage::NodeStorage; -use telemetry_subscribers::TelemetryGuards; -use test_utils::latest_protocol_version; -use test_utils::{cluster::Cluster, temp_dir, CommitteeFixture}; -use tokio::sync::watch; - -use types::{Certificate, PreSubscribedBroadcastSender, Round, TransactionProto}; - -#[tokio::test] -async fn test_recovery() { - // Create storage - let storage = NodeStorage::reopen(temp_dir(), None); - - let consensus_store = storage.consensus_store; - let certificate_store = storage.certificate_store; - - // Setup consensus - let fixture = CommitteeFixture::builder().build(); - let committee = fixture.committee(); - - // Make certificates for rounds 1 and 2. - let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); - let genesis = Certificate::genesis(&latest_protocol_version(), &committee) - .iter() - .map(|x| x.digest()) - .collect::>(); - let (mut certificates, next_parents) = test_utils::make_optimal_certificates( - &committee, - &latest_protocol_version(), - 1..=2, - &genesis, - &ids, - ); - - // Make two certificate (f+1) with round 3 to trigger the commits. - let (_, certificate) = test_utils::mock_certificate( - &committee, - &latest_protocol_version(), - ids[0], - 3, - next_parents.clone(), - ); - certificates.push_back(certificate); - let (_, certificate) = test_utils::mock_certificate( - &committee, - &latest_protocol_version(), - ids[1], - 3, - next_parents, - ); - certificates.push_back(certificate); - - // Spawn the consensus engine and sink the primary channel. - let (tx_waiter, rx_waiter) = test_utils::test_channel!(1); - let (tx_primary, mut rx_primary) = test_utils::test_channel!(1); - let (tx_output, mut rx_output) = test_utils::test_channel!(1); - let (tx_consensus_round_updates, _rx_consensus_round_updates) = - watch::channel(ConsensusRound::default()); - - let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - - const GC_DEPTH: Round = 50; - const NUM_SUB_DAGS_PER_SCHEDULE: u64 = 100; - let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); - let bullshark = Bullshark::new( - committee.clone(), - consensus_store.clone(), - latest_protocol_version(), - metrics.clone(), - NUM_SUB_DAGS_PER_SCHEDULE, - LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()), - ); - - let _consensus_handle = Consensus::spawn( - committee, - GC_DEPTH, - consensus_store.clone(), - certificate_store.clone(), - tx_shutdown.subscribe(), - rx_waiter, - tx_primary, - tx_consensus_round_updates, - tx_output, - bullshark, - metrics, - ); - tokio::spawn(async move { while rx_primary.recv().await.is_some() {} }); - - // Feed all certificates to the consensus. Only the last certificate should trigger - // commits, so the task should not block. - while let Some(certificate) = certificates.pop_front() { - // we store the certificates so we can enable the recovery - // mechanism later. - certificate_store.write(certificate.clone()).unwrap(); - tx_waiter.send(certificate).await.unwrap(); - } - - // Ensure the first 4 ordered certificates are from round 1 (they are the parents of the committed - // leader); then the leader's certificate should be committed. - let consensus_index_counter = 4; - let num_of_committed_certificates = 5; - - let committed_sub_dag = rx_output.recv().await.unwrap(); - let mut sequence = committed_sub_dag.certificates.into_iter(); - for i in 1..=num_of_committed_certificates { - let output = sequence.next().unwrap(); - - if i < 5 { - assert_eq!(output.round(), 1); - } else { - assert_eq!(output.round(), 2); - } - } - - // Now assume that we want to recover from a crash. We are testing all the recovery cases - // from having executed no certificates at all (or certificate with index = 0), up to - // have executed the last committed certificate - for last_executed_certificate_index in 0..consensus_index_counter { - let mut execution_state = MockExecutionState::new(); - execution_state - .expect_last_executed_sub_dag_index() - .times(1) - .returning(|| 1); - - let consensus_output = get_restored_consensus_output( - consensus_store.clone(), - certificate_store.clone(), - &execution_state, - ) - .await - .unwrap(); - - // we expect to have recovered all the certificates from the last commit. The Sui executor engine - // will not execute twice the same certificate. - assert_eq!(consensus_output.len(), 1); - assert!( - consensus_output[0].len() - >= (num_of_committed_certificates - last_executed_certificate_index) as usize - ); - } -} - -#[tokio::test] -async fn test_internal_consensus_output() { - // Enabled debug tracing so we can easily observe the - // nodes logs. - let _guard = setup_tracing(); - - let mut cluster = Cluster::new(None); - - // start the cluster - cluster.start(Some(4), Some(1), None).await; - - // get a client to send transactions - let worker_id = 0; - - let authority = cluster.authority(0); - let mut client = authority.new_transactions_client(&worker_id).await; - - // Subscribe to the transaction confirmation channel - let mut receiver = authority - .primary() - .await - .tx_transaction_confirmation - .subscribe(); - - // Create arbitrary transactions - let mut transactions = Vec::new(); - - const NUM_OF_TRANSACTIONS: u32 = 10; - for i in 0..NUM_OF_TRANSACTIONS { - let tx = string_transaction(i); - - // serialise and send - let tr = bcs::to_bytes(&tx).unwrap(); - let txn = TransactionProto { - transactions: vec![Bytes::from(tr)], - }; - client.submit_transaction(txn).await.unwrap(); - - transactions.push(tx); - } - - // wait for transactions to complete - loop { - let result = receiver.recv().await.unwrap(); - - // deserialise transaction - let output_transaction = bcs::from_bytes::(&result).unwrap(); - - // we always remove the first transaction and check with the one - // sequenced. We want the transactions to be sequenced in the - // same order as we post them. - let expected_transaction = transactions.remove(0); - - assert_eq!( - expected_transaction, output_transaction, - "Expected to have received transaction with same id. Ordering is important" - ); - - if transactions.is_empty() { - break; - } - } -} - -fn string_transaction(id: u32) -> String { - format!("test transaction:{id}") -} - -fn setup_tracing() -> TelemetryGuards { - // Setup tracing - let tracing_level = "debug"; - let network_tracing_level = "info"; - - let log_filter = format!("{tracing_level},h2={network_tracing_level},tower={network_tracing_level},hyper={network_tracing_level},tonic::transport={network_tracing_level}"); - - telemetry_subscribers::TelemetryConfig::new() - // load env variables - .with_env() - // load special log filter - .with_log_level(&log_filter) - .init() - .0 -} diff --git a/narwhal/primary/tests/causal_completion_tests.rs b/narwhal/primary/tests/causal_completion_tests.rs deleted file mode 100644 index d564f9ea39f4d..0000000000000 --- a/narwhal/primary/tests/causal_completion_tests.rs +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use bytes::Bytes; -use std::time::Duration; -use test_utils::cluster::{setup_tracing, Cluster}; -use tracing::info; -use types::TransactionProto; - -type StringTransaction = String; - -#[ignore] -#[tokio::test] -async fn test_restore_from_disk() { - // Enabled debug tracing so we can easily observe the - // nodes logs. - let _guard = setup_tracing(); - - let mut cluster = Cluster::new(None); - - // start the cluster - cluster.start(Some(4), Some(1), None).await; - - let id = 0; - let client = cluster.authority(0).new_transactions_client(&id).await; - - // Subscribe to the transaction confirmation channel - let mut receiver = cluster - .authority(0) - .primary() - .await - .tx_transaction_confirmation - .subscribe(); - - // Create arbitrary transactions - let mut total_tx = 3; - for tx in [ - string_transaction(), - string_transaction(), - string_transaction(), - ] { - let mut c = client.clone(); - tokio::spawn(async move { - let tr = bcs::to_bytes(&tx).unwrap(); - let txn = TransactionProto { - transactions: vec![Bytes::from(tr)], - }; - - c.submit_transaction(txn).await.unwrap(); - }); - } - - // wait for transactions to complete - loop { - if let Ok(_result) = receiver.recv().await { - total_tx -= 1; - if total_tx < 1 { - break; - } - } - } - - // Now stop node 0 - cluster.stop_node(0).await; - - // Let other primaries advance and primary 0 releases its port. - tokio::time::sleep(Duration::from_secs(10)).await; - - // Now start the node 0 again - cluster.start_node(0, true, Some(1)).await; - - // Let the node recover - tokio::time::sleep(Duration::from_secs(2)).await; - - let node = cluster.authority(0); - - // Check the metrics to ensure the node was recovered from disk - let primary = node.primary().await; - - let node_recovered_state = - if let Some(metric) = primary.metric("recovered_consensus_state").await { - let value = metric.get_counter().get_value(); - info!("Found metric for recovered consensus state."); - - value > 0.0 - } else { - false - }; - - assert!(node_recovered_state, "Node did not recover state from disk"); -} - -fn string_transaction() -> StringTransaction { - StringTransaction::from("test transaction") -} - -#[ignore] -#[tokio::test] -async fn test_read_causal_signed_certificates() { - const CURRENT_ROUND_METRIC: &str = "current_round"; - - // Enabled debug tracing so we can easily observe the - // nodes logs. - let _guard = setup_tracing(); - - let mut cluster = Cluster::new(None); - - // start the cluster - cluster.start(Some(4), Some(1), None).await; - - // Let primaries advance little bit - tokio::time::sleep(Duration::from_secs(10)).await; - - // Ensure all nodes advanced - for authority in cluster.authorities().await { - if let Some(metric) = authority.primary().await.metric(CURRENT_ROUND_METRIC).await { - let value = metric.get_gauge().get_value(); - - info!("Metric -> {:?}", value); - - // If the current round is increasing then it means that the - // node starts catching up and is proposing. - assert!(value > 1.0, "Node didn't progress further than the round 1"); - } - } - - // Now stop node 0 - cluster.stop_node(0).await; - - // Let other primaries advance and primary 0 releases its port. - tokio::time::sleep(Duration::from_secs(10)).await; - - // Now start the validator 0 again - cluster.start_node(0, true, Some(1)).await; - - // Now check that the current round advances. Give the opportunity with a few - // iterations. If metric hasn't picked up then we know that node can't make - // progress. - let mut node_made_progress = false; - let node = cluster.authority(0).primary().await; - - for _ in 0..10 { - tokio::time::sleep(Duration::from_secs(1)).await; - - if let Some(metric) = node.metric(CURRENT_ROUND_METRIC).await { - let value = metric.get_gauge().get_value(); - info!("Metric -> {:?}", value); - - // If the current round is increasing then it means that the - // node starts catching up and is proposing. - if value > 1.0 { - node_made_progress = true; - break; - } - } - } - - assert!( - node_made_progress, - "Node 0 didn't make progress - causal completion didn't succeed" - ); -} diff --git a/narwhal/primary/tests/nodes_bootstrapping_tests.rs b/narwhal/primary/tests/nodes_bootstrapping_tests.rs deleted file mode 100644 index 373676bfe7cb1..0000000000000 --- a/narwhal/primary/tests/nodes_bootstrapping_tests.rs +++ /dev/null @@ -1,299 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use bytes::Bytes; -use std::time::Duration; -use test_utils::cluster::{setup_tracing, Cluster}; -use types::TransactionProto; - -#[tokio::test(flavor = "current_thread", start_paused = true)] -async fn test_response_error_after_shutdown_internal_consensus() { - // Enabled debug tracing so we can easily observe the - // nodes logs. - let _guard = setup_tracing(); - - let delay = Duration::from_secs(10); // 10 seconds - - // A cluster of 4 nodes will be created, with internal consensus. - let cluster = Cluster::new(None); - - // ==== Start first authority ==== - let authority = cluster.authority(0); - authority.start(false, Some(1)).await; - - tokio::time::sleep(delay).await; - - authority.stop_all().await; - - tokio::time::sleep(delay).await; - - let worker_id = 0; - let mut client = authority.new_transactions_client(&worker_id).await; - - // Create a fake transaction - let tx_str = "test transaction".to_string(); - let tx = bcs::to_bytes(&tx_str).unwrap(); - let txn = TransactionProto { - transactions: vec![Bytes::from(tx)], - }; - - // Should fail submitting to consensus. - let Err(e) = client.submit_transaction(txn).await else { - panic!("Submitting transactions after Narwhal shutdown should fail!"); - }; - assert!(e.message().contains("tcp connect error:"), "Actual: {}", e); -} - -/// Nodes will be started in a staggered fashion. This is simulating -/// a real world scenario where nodes across validators will not start -/// in the same time. -#[ignore] -#[tokio::test] -async fn test_node_staggered_starts() { - // Enabled debug tracing so we can easily observe the - // nodes logs. - let _guard = setup_tracing(); - - let node_staggered_delay = Duration::from_secs(60 * 2); // 2 minutes - - // A cluster of 4 nodes will be created - let cluster = Cluster::new(None); - - // ==== Start first authority ==== - cluster.authority(0).start(false, Some(1)).await; - - tokio::time::sleep(node_staggered_delay).await; - - // No node should be able to commit, no reported round was expected - cluster.assert_progress(0, 0).await; - - // ==== Start second authority ==== - cluster.authority(1).start(false, Some(1)).await; - - tokio::time::sleep(node_staggered_delay).await; - - // No node should be able to commit, no reported round was expected - cluster.assert_progress(0, 0).await; - - // ==== Start third authority ==== - // Now 2f + 1 nodes are becoming available and we expect all the nodes to - // start making progress (advance in rounds). - cluster.authority(2).start(false, Some(1)).await; - - tokio::time::sleep(node_staggered_delay).await; - - // We have only (f) unavailable nodes, so all should have made progress and committed at least after the first round - cluster.assert_progress(3, 2).await; - - // ==== Start fourth authority ==== - // Now 3f + 1 nodes are becoming available (the whole network) and all the nodes - // should make progress - cluster.authority(3).start(false, Some(1)).await; - - tokio::time::sleep(node_staggered_delay).await; - - // All nodes are available so all should have made progress and committed at least after the first round - cluster.assert_progress(4, 2).await; -} - -/// All the nodes have an outage at the same time, when they recover, the rounds begin to advance. -#[ignore] -#[tokio::test] -async fn test_full_outage_and_recovery() { - let _guard = setup_tracing(); - - let stop_and_start_delay = Duration::from_secs(12); - let node_advance_delay = Duration::from_secs(60); - - // A cluster of 4 nodes will be created - let mut cluster = Cluster::new(None); - - // ===== Start the cluster ==== - cluster.start(Some(4), Some(1), None).await; - - // Let the nodes advance a bit - tokio::time::sleep(node_advance_delay).await; - - // Stop all the nodes - cluster.authority(0).stop_all().await; - tokio::time::sleep(stop_and_start_delay).await; - - cluster.authority(1).stop_all().await; - tokio::time::sleep(stop_and_start_delay).await; - - cluster.authority(2).stop_all().await; - tokio::time::sleep(stop_and_start_delay).await; - - cluster.authority(3).stop_all().await; - tokio::time::sleep(stop_and_start_delay).await; - - // Start all the nodes - cluster.authority(0).start(true, Some(1)).await; - tokio::time::sleep(stop_and_start_delay).await; - - cluster.authority(1).start(true, Some(1)).await; - tokio::time::sleep(stop_and_start_delay).await; - - cluster.authority(2).start(true, Some(1)).await; - tokio::time::sleep(stop_and_start_delay).await; - - cluster.authority(3).start(true, Some(1)).await; - - // now wait a bit to give the opportunity to recover - tokio::time::sleep(node_advance_delay).await; - - // Ensure that nodes have made progress - cluster.assert_progress(4, 2).await; -} - -#[ignore] -#[tokio::test] -async fn test_second_node_restart() { - // Enabled debug tracing so we can easily observe the - // nodes logs. - let _guard = setup_tracing(); - - let restart_delay = Duration::from_secs(120); - let node_advance_delay = Duration::from_secs(60); - - // A cluster of 4 nodes will be created - let mut cluster = Cluster::new(None); - - // ===== Start the cluster ==== - cluster.start(Some(4), Some(1), None).await; - - // Let the nodes advance a bit - tokio::time::sleep(node_advance_delay).await; - - // Now restart node 2 with some delay between - cluster.authority(2).restart(true, restart_delay).await; - - // now wait a bit to give the opportunity to recover - tokio::time::sleep(node_advance_delay).await; - - // Ensure that nodes have made progress - cluster.assert_progress(4, 2).await; - - // Now restart node 3 with some delay between - cluster.authority(3).restart(true, restart_delay).await; - - // now wait a bit to give the opportunity to recover - tokio::time::sleep(node_advance_delay).await; - - // Ensure that nodes have made progress - cluster.assert_progress(4, 2).await; -} - -#[ignore] -#[tokio::test] -/// We are testing the loss of liveness of a healthy cluster. While 3f+1 nodes run -/// we are shutting down f+1 nodes. Then we are bringing the f+1 nodes back again -/// We expect the restarted nodes to be able to make new proposals, and all the nodes -/// should be able to propose from where they left of at last round, and the rounds should -/// all advance. -async fn test_loss_of_liveness_without_recovery() { - // Enabled debug tracing so we can easily observe the - // nodes logs. - let _guard = setup_tracing(); - - let node_advance_delay = Duration::from_secs(60); - - // A cluster of 4 nodes will be created - let mut cluster = Cluster::new(None); - - // ===== Start the cluster ==== - cluster.start(Some(4), Some(1), None).await; - - // Let the nodes advance a bit - tokio::time::sleep(node_advance_delay).await; - - // Ensure that nodes have made progress - cluster.assert_progress(4, 2).await; - - // Now stop node 2 & 3 - cluster.authority(2).stop_all().await; - cluster.authority(3).stop_all().await; - - // wait and fetch the latest commit round - tokio::time::sleep(node_advance_delay).await; - let rounds_1 = cluster.assert_progress(2, 0).await; - - // wait and fetch again the rounds - tokio::time::sleep(node_advance_delay).await; - let rounds_2 = cluster.assert_progress(2, 0).await; - - // We assert that nodes haven't advanced at all - assert_eq!(rounds_1, rounds_2); - - // Now bring up nodes - cluster.authority(2).start(true, Some(1)).await; - cluster.authority(3).start(true, Some(1)).await; - - // wait and fetch the latest commit round. All of them should have advanced and we allow a small - // threshold in case some node is faster than the others - tokio::time::sleep(node_advance_delay).await; - let rounds_3 = cluster.assert_progress(4, 2).await; - - // we test that nodes 0 & 1 have actually advanced in rounds compared to before. - assert!(rounds_3.get(&0) > rounds_2.get(&0)); - assert!(rounds_3.get(&1) > rounds_2.get(&1)); -} - -#[ignore] -#[tokio::test] -/// We are testing the loss of liveness of a healthy cluster. While 3f+1 nodes run -/// we are shutting down f+1 nodes one by one with some delay between them. -/// Then we are bringing the f+1 nodes back again. We expect the cluster to -/// recover and effectively make progress. -async fn test_loss_of_liveness_with_recovery() { - // Enabled debug tracing so we can easily observe the - // nodes logs. - let _guard = setup_tracing(); - - let node_advance_delay = Duration::from_secs(60); - - // A cluster of 4 nodes will be created - let mut cluster = Cluster::new(None); - - // ===== Start the cluster ==== - cluster.start(Some(4), Some(1), None).await; - - // Let the nodes advance a bit - tokio::time::sleep(node_advance_delay).await; - - // Ensure that nodes have made progress - cluster.assert_progress(4, 2).await; - - // Now stop node 2 - cluster.authority(2).stop_all().await; - - // allow other nodes to advance - tokio::time::sleep(node_advance_delay).await; - - // Now stop node 3 - cluster.authority(3).stop_all().await; - - // wait and fetch the latest commit round - tokio::time::sleep(node_advance_delay).await; - let rounds_1 = cluster.assert_progress(2, 0).await; - - // wait and fetch again the rounds - tokio::time::sleep(node_advance_delay).await; - let rounds_2 = cluster.assert_progress(2, 0).await; - - // We assert that nodes haven't advanced at all - assert_eq!(rounds_1, rounds_2); - - // Now bring up nodes - cluster.authority(2).start(true, Some(1)).await; - cluster.authority(3).start(true, Some(1)).await; - - // wait and fetch the latest commit round - tokio::time::sleep(node_advance_delay).await; - let rounds_3 = cluster.assert_progress(4, 2).await; - - let round_2_max = rounds_2.values().max().unwrap(); - assert!( - rounds_3.values().all(|v| v > round_2_max), - "All the nodes should have advanced more from the previous round" - ); -} diff --git a/narwhal/test-utils/src/cluster.rs b/narwhal/test-utils/src/cluster.rs index cd0e426f2a90d..45202094800e4 100644 --- a/narwhal/test-utils/src/cluster.rs +++ b/narwhal/test-utils/src/cluster.rs @@ -754,6 +754,7 @@ impl AuthorityDetails { .get(worker_id) .unwrap() .transactions_address, + None, ) .unwrap(); diff --git a/narwhal/worker/src/lib.rs b/narwhal/worker/src/lib.rs index cb453b3bc12b9..f74fa44aca4fc 100644 --- a/narwhal/worker/src/lib.rs +++ b/narwhal/worker/src/lib.rs @@ -13,7 +13,6 @@ mod batch_maker; mod client; mod handlers; mod quorum_waiter; -mod transactions_server; mod tx_validator; mod worker; diff --git a/narwhal/worker/src/tests/worker_tests.rs b/narwhal/worker/src/tests/worker_tests.rs deleted file mode 100644 index 99421109124da..0000000000000 --- a/narwhal/worker/src/tests/worker_tests.rs +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright (c) 2021, Facebook, Inc. and its affiliates -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use super::*; -use crate::LocalNarwhalClient; -use crate::{metrics::initialise_metrics, TrivialTransactionValidator}; -use async_trait::async_trait; -use bytes::Bytes; -use fastcrypto::hash::Hash; -use futures::stream::FuturesOrdered; -use futures::StreamExt; -use primary::{CHANNEL_CAPACITY, NUM_SHUTDOWN_RECEIVERS}; -use prometheus::Registry; -use store::rocks; -use store::rocks::MetricConf; -use store::rocks::ReadWriteOptions; -use test_utils::{ - batch, latest_protocol_version, temp_dir, test_network, transaction, CommitteeFixture, -}; -use types::{ - BatchAPI, MockWorkerToPrimary, MockWorkerToWorker, PreSubscribedBroadcastSender, - TransactionProto, TransactionsClient, WorkerBatchMessage, WorkerToWorkerClient, -}; - -// A test validator that rejects every transaction / batch -#[derive(Clone)] -struct NilTxValidator; -#[async_trait] -impl TransactionValidator for NilTxValidator { - type Error = eyre::Report; - - fn validate(&self, _tx: &[u8]) -> Result<(), Self::Error> { - eyre::bail!("Invalid transaction"); - } - fn validate_batch( - &self, - _txs: &Batch, - _protocol_config: &ProtocolConfig, - ) -> Result<(), Self::Error> { - eyre::bail!("Invalid batch"); - } -} - -#[tokio::test] -async fn reject_invalid_clients_transactions() { - let fixture = CommitteeFixture::builder().randomize_ports(true).build(); - let committee = fixture.committee(); - let worker_cache = fixture.worker_cache(); - - let worker_id = 0; - let my_primary = fixture.authorities().next().unwrap(); - let myself = my_primary.worker(worker_id); - let public_key = my_primary.public_key(); - let client = NetworkClient::new_from_keypair(&my_primary.network_keypair()); - - let parameters = Parameters { - batch_size: 200, // Two transactions. - ..Parameters::default() - }; - - // Create a new test store. - let batch_store = rocks::DBMap::::open( - temp_dir(), - MetricConf::default(), - None, - Some("batches"), - &ReadWriteOptions::default(), - ) - .unwrap(); - - let registry = Registry::new(); - let metrics = initialise_metrics(®istry); - - let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - - // Spawn a `Worker` instance with a reject-all validator. - Worker::spawn( - my_primary.authority().clone(), - myself.keypair(), - worker_id, - committee.clone(), - worker_cache.clone(), - latest_protocol_version(), - parameters, - NilTxValidator, - client, - batch_store, - metrics, - &mut tx_shutdown, - ); - - // Wait till other services have been able to start up - tokio::task::yield_now().await; - // Send enough transactions to create a batch. - let address = worker_cache - .worker(&public_key, &worker_id) - .unwrap() - .transactions; - let config = mysten_network::config::Config::new(); - let channel = config.connect_lazy(&address).unwrap(); - let mut client = TransactionsClient::new(channel); - let tx = transaction(); - let txn = TransactionProto { - transactions: vec![Bytes::from(tx.clone())], - }; - - // Check invalid transactions are rejected - let res = client.submit_transaction(txn).await; - assert!(res.is_err()); - - let worker_pk = worker_cache.worker(&public_key, &worker_id).unwrap().name; - - let batch = batch(&latest_protocol_version()); - let batch_message = WorkerBatchMessage { - batch: batch.clone(), - }; - - // setup network : impersonate a send from another worker - let another_primary = fixture.authorities().nth(2).unwrap(); - let another_worker = another_primary.worker(worker_id); - let network = test_network( - another_worker.keypair(), - &another_worker.info().worker_address, - ); - // ensure that the networks are connected - network - .connect(myself.info().worker_address.to_anemo_address().unwrap()) - .await - .unwrap(); - let peer = network.peer(PeerId(worker_pk.0.to_bytes())).unwrap(); - - // Check invalid batches are rejected - let res = WorkerToWorkerClient::new(peer) - .report_batch(batch_message) - .await; - assert!(res.is_err()); -} - -/// TODO: test both RemoteNarwhalClient and LocalNarwhalClient in the same test case. -#[tokio::test] -async fn handle_remote_clients_transactions() { - let fixture = CommitteeFixture::builder().randomize_ports(true).build(); - let committee = fixture.committee(); - let worker_cache = fixture.worker_cache(); - - let worker_id = 0; - let my_primary = fixture.authorities().next().unwrap(); - let myself = my_primary.worker(worker_id); - let authority_public_key = my_primary.public_key(); - let client = NetworkClient::new_from_keypair(&my_primary.network_keypair()); - - let parameters = Parameters { - batch_size: 200, // Two transactions. - ..Parameters::default() - }; - - // Create a new test store. - let batch_store = rocks::DBMap::::open( - temp_dir(), - MetricConf::default(), - None, - Some("batches"), - &ReadWriteOptions::default(), - ) - .unwrap(); - - let registry = Registry::new(); - let metrics = initialise_metrics(®istry); - - let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - - // Spawn a `Worker` instance. - Worker::spawn( - my_primary.authority().clone(), - myself.keypair(), - worker_id, - committee.clone(), - worker_cache.clone(), - latest_protocol_version(), - parameters, - TrivialTransactionValidator, - client.clone(), - batch_store, - metrics, - &mut tx_shutdown, - ); - - // Spawn a network listener to receive our batch's digest. - let mut peer_networks = Vec::new(); - - // Create batches - let batch = batch(&latest_protocol_version()); - let batch_digest = batch.digest(); - - let (tx_await_batch, mut rx_await_batch) = test_utils::test_channel!(CHANNEL_CAPACITY); - let mut mock_primary_server = MockWorkerToPrimary::new(); - mock_primary_server - .expect_report_own_batch() - .withf(move |request| { - let message = request.body(); - - message.digest == batch_digest && message.worker_id == worker_id - }) - .times(1) - .returning(move |_| { - tx_await_batch.try_send(()).unwrap(); - Ok(anemo::Response::new(())) - }); - client.set_worker_to_primary_local_handler(Arc::new(mock_primary_server)); - - // Spawn enough workers' listeners to acknowledge our batches. - for worker in fixture.authorities().skip(1).map(|a| a.worker(worker_id)) { - let mut mock_server = MockWorkerToWorker::new(); - mock_server - .expect_report_batch() - .returning(|_| Ok(anemo::Response::new(()))); - let routes = anemo::Router::new().add_rpc_service(WorkerToWorkerServer::new(mock_server)); - peer_networks.push(worker.new_network(routes)); - } - - // Wait till other services have been able to start up - tokio::task::yield_now().await; - // Send enough transactions to create a batch. - let address = worker_cache - .worker(&authority_public_key, &worker_id) - .unwrap() - .transactions; - let config = mysten_network::config::Config::new(); - let channel = config.connect_lazy(&address).unwrap(); - let client = TransactionsClient::new(channel); - - let join_handle = tokio::task::spawn(async move { - let mut fut_list = FuturesOrdered::new(); - for tx in batch.transactions() { - let txn = TransactionProto { - transactions: vec![Bytes::from(tx.clone())], - }; - - // Calls to submit_transaction are now blocking, so we need to drive them - // all at the same time, rather than sequentially. - let mut inner_client = client.clone(); - fut_list.push_back(async move { - inner_client.submit_transaction(txn).await.unwrap(); - }); - } - - // Drive all sending in parallel. - while fut_list.next().await.is_some() {} - }); - - // Ensure the primary received the batch's digest (ie. it did not panic). - rx_await_batch.recv().await.unwrap(); - - // Ensure sending ended. - assert!(join_handle.await.is_ok()); -} - -/// TODO: test both RemoteNarwhalClient and LocalNarwhalClient in the same test case. -#[tokio::test] -async fn handle_local_clients_transactions() { - let fixture = CommitteeFixture::builder().randomize_ports(true).build(); - let committee = fixture.committee(); - let worker_cache = fixture.worker_cache(); - - let worker_id = 0; - let my_primary = fixture.authorities().next().unwrap(); - let myself = my_primary.worker(worker_id); - let authority_public_key = my_primary.public_key(); - let client = NetworkClient::new_from_keypair(&my_primary.network_keypair()); - - let parameters = Parameters { - batch_size: 200, // Two transactions. - ..Parameters::default() - }; - - // Create a new test store. - let batch_store = rocks::DBMap::::open( - temp_dir(), - MetricConf::default(), - None, - Some("batches"), - &ReadWriteOptions::default(), - ) - .unwrap(); - - let registry = Registry::new(); - let metrics = initialise_metrics(®istry); - - let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); - - // Spawn a `Worker` instance. - Worker::spawn( - my_primary.authority().clone(), - myself.keypair(), - worker_id, - committee.clone(), - worker_cache.clone(), - latest_protocol_version(), - parameters, - TrivialTransactionValidator, - client.clone(), - batch_store, - metrics, - &mut tx_shutdown, - ); - - // Spawn a network listener to receive our batch's digest. - let mut peer_networks = Vec::new(); - - // Create batches - let batch = batch(&latest_protocol_version()); - let batch_digest = batch.digest(); - - let (tx_await_batch, mut rx_await_batch) = test_utils::test_channel!(CHANNEL_CAPACITY); - let mut mock_primary_server = MockWorkerToPrimary::new(); - mock_primary_server - .expect_report_own_batch() - .withf(move |request| { - let message = request.body(); - message.digest == batch_digest && message.worker_id == worker_id - }) - .times(1) - .returning(move |_| { - tx_await_batch.try_send(()).unwrap(); - Ok(anemo::Response::new(())) - }); - client.set_worker_to_primary_local_handler(Arc::new(mock_primary_server)); - - // Spawn enough workers' listeners to acknowledge our batches. - for worker in fixture.authorities().skip(1).map(|a| a.worker(worker_id)) { - let mut mock_server = MockWorkerToWorker::new(); - mock_server - .expect_report_batch() - .returning(|_| Ok(anemo::Response::new(()))); - let routes = anemo::Router::new().add_rpc_service(WorkerToWorkerServer::new(mock_server)); - peer_networks.push(worker.new_network(routes)); - } - - // Wait till other services have been able to start up - tokio::task::yield_now().await; - // Send enough transactions to create a batch. - let address = worker_cache - .worker(&authority_public_key, &worker_id) - .unwrap() - .transactions; - let client = LocalNarwhalClient::get_global(&address).unwrap().load(); - - let join_handle = tokio::task::spawn(async move { - let mut fut_list = FuturesOrdered::new(); - for txn in batch.transactions() { - // Calls to submit_transaction are now blocking, so we need to drive them - // all at the same time, rather than sequentially. - let inner_client = client.clone(); - fut_list.push_back(async move { - inner_client - .submit_transactions(vec![txn.clone()]) - .await - .unwrap(); - }); - } - - // Drive all sending in parallel. - while fut_list.next().await.is_some() {} - }); - - // Ensure the primary received the batch's digest (ie. it did not panic). - rx_await_batch.recv().await.unwrap(); - - // Ensure sending ended. - assert!(join_handle.await.is_ok()); -} diff --git a/narwhal/worker/src/transactions_server.rs b/narwhal/worker/src/transactions_server.rs deleted file mode 100644 index 0790083001c44..0000000000000 --- a/narwhal/worker/src/transactions_server.rs +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use crate::client::LocalNarwhalClient; -use crate::metrics::WorkerEndpointMetrics; -use crate::TransactionValidator; -use async_trait::async_trait; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use mysten_metrics::metered_channel::Sender; -use mysten_metrics::{monitored_scope, spawn_logged_monitored_task}; -use mysten_network::server::Server; -use mysten_network::Multiaddr; -use std::sync::Arc; -use std::time::Duration; -use tokio::task::JoinHandle; -use tokio::time::{sleep, timeout}; -use tonic::{Request, Response, Status}; -use tracing::{error, info, warn}; -use types::{ - ConditionalBroadcastReceiver, Empty, Transaction, TransactionProto, Transactions, - TransactionsServer, TxResponse, -}; - -pub struct TxServer { - address: Multiaddr, - rx_shutdown: ConditionalBroadcastReceiver, - endpoint_metrics: WorkerEndpointMetrics, - local_client: Arc, - validator: V, -} - -impl TxServer { - #[must_use] - pub fn spawn( - address: Multiaddr, - rx_shutdown: ConditionalBroadcastReceiver, - endpoint_metrics: WorkerEndpointMetrics, - tx_batch_maker: Sender<(Vec, TxResponse)>, - validator: V, - ) -> JoinHandle<()> { - // create and initialize local Narwhal client. - let local_client = LocalNarwhalClient::new(tx_batch_maker); - LocalNarwhalClient::set_global(address.clone(), local_client.clone()); - - spawn_logged_monitored_task!( - Self { - address, - rx_shutdown, - endpoint_metrics, - local_client, - validator, - } - .run(), - "TxServer" - ) - } - - async fn run(mut self) { - const MAX_RETRIES: usize = 10; - const RETRY_BACKOFF: Duration = Duration::from_millis(1_000); - const GRACEFUL_SHUTDOWN_DURATION: Duration = Duration::from_millis(2_000); - - // create the handler - let tx_handler = TxReceiverHandler { - local_client: self.local_client.clone(), - validator: self.validator, - }; - - // now create the server - let mut retries = MAX_RETRIES; - let mut server: Server; - - loop { - match mysten_network::config::Config::new() - .server_builder_with_metrics(self.endpoint_metrics.clone()) - .add_service(TransactionsServer::new(tx_handler.clone())) - .bind(&self.address) - .await - { - Ok(s) => { - server = s; - break; - } - Err(err) => { - retries -= 1; - if retries == 0 { - panic!( - "Couldn't boot transactions server, permanently failed: {}", - err - ); - } - - error!( - "Couldn't boot transactions server at try {}, will wait {}s and retry: {}", - retries, - RETRY_BACKOFF.as_secs_f64(), - err - ); - - sleep(RETRY_BACKOFF).await; - } - } - } - - let shutdown_handle = server.take_cancel_handle().unwrap(); - - let server_handle = spawn_logged_monitored_task!(server.serve()); - - // wait to receive a shutdown signal - let _ = self.rx_shutdown.receiver.recv().await; - - // once do just gracefully signal the node to shutdown - shutdown_handle.send(()).unwrap(); - - // now wait until the handle completes or timeout if it takes long time - match timeout(GRACEFUL_SHUTDOWN_DURATION, server_handle).await { - Ok(_) => { - info!("Successfully shutting down gracefully transactions server"); - } - Err(err) => { - warn!( - "Time out while waiting to gracefully shutdown transactions server: {}", - err - ) - } - } - } -} - -/// Defines how the network receiver handles incoming transactions. -#[derive(Clone)] -pub(crate) struct TxReceiverHandler { - pub(crate) local_client: Arc, - pub(crate) validator: V, -} - -#[async_trait] -impl Transactions for TxReceiverHandler { - async fn submit_transaction( - &self, - request: Request, - ) -> Result, Status> { - let _scope = monitored_scope("SubmitTransaction"); - let transactions = request.into_inner().transactions; - - let validate_scope = monitored_scope("SubmitTransaction_ValidateTx"); - for transaction in &transactions { - if self.validator.validate(transaction.as_ref()).is_err() { - return Err(Status::invalid_argument("Invalid transaction")); - } - } - drop(validate_scope); - - // Send the transaction to Narwhal via the local client. - let submit_scope = monitored_scope("SubmitTransaction_SubmitTx"); - self.local_client - .submit_transactions(transactions.iter().map(|x| x.to_vec()).collect()) - .await - .map_err(|e| Status::internal(e.to_string()))?; - drop(submit_scope); - Ok(Response::new(Empty {})) - } - - async fn submit_transaction_stream( - &self, - request: Request>, - ) -> Result, Status> { - let mut transactions = request.into_inner(); - let mut requests = FuturesUnordered::new(); - - let _scope = monitored_scope("SubmitTransactionStream"); - while let Some(Ok(request)) = transactions.next().await { - let num_txns = request.transactions.len(); - if num_txns != 1 { - return Err(Status::invalid_argument(format!( - "Stream contains an invalid number of transactions: {num_txns}" - ))); - } - let txn = &request.transactions[0]; - let validate_scope = monitored_scope("SubmitTransactionStream_ValidateTx"); - if let Err(err) = self.validator.validate(txn.as_ref()) { - // If the transaction is invalid (often cryptographically), better to drop the client - return Err(Status::invalid_argument(format!( - "Stream contains an invalid transaction {err}" - ))); - } - drop(validate_scope); - // Send the transaction to Narwhal via the local client. - // Note that here we do not wait for a response because this would - // mean that we process only a single message from this stream at a - // time. Instead we gather them and resolve them once the stream is over. - let submit_scope = monitored_scope("SubmitTransactionStream_SubmitTx"); - requests.push(self.local_client.submit_transactions(vec![txn.to_vec()])); - drop(submit_scope); - } - - while let Some(result) = requests.next().await { - if let Err(e) = result { - return Err(Status::internal(e.to_string())); - } - } - - Ok(Response::new(Empty {})) - } -} diff --git a/narwhal/worker/src/worker.rs b/narwhal/worker/src/worker.rs index 63064a1b6c925..b38b80a755623 100644 --- a/narwhal/worker/src/worker.rs +++ b/narwhal/worker/src/worker.rs @@ -41,15 +41,10 @@ use types::{ PrimaryToWorkerServer, WorkerToWorkerServer, }; -#[cfg(test)] -#[path = "tests/worker_tests.rs"] -pub mod worker_tests; - /// The default channel capacity for each channel of the worker. pub const CHANNEL_CAPACITY: usize = 1_000; use crate::metrics::{Metrics, WorkerEndpointMetrics, WorkerMetrics}; -use crate::transactions_server::TxServer; pub struct Worker { /// This authority. @@ -440,19 +435,20 @@ impl Worker { } /// Spawn all tasks responsible to handle clients transactions. + // TODO: finish deleting this. It's partially deleted already and may not work right. fn handle_clients_transactions( &self, mut shutdown_receivers: Vec, node_metrics: Arc, channel_metrics: Arc, - endpoint_metrics: WorkerEndpointMetrics, - validator: impl TransactionValidator, + _endpoint_metrics: WorkerEndpointMetrics, + _validator: impl TransactionValidator, client: NetworkClient, network: anemo::Network, ) -> Vec> { info!("Starting handler for transactions"); - let (tx_batch_maker, rx_batch_maker) = channel_with_total( + let (_tx_batch_maker, rx_batch_maker) = channel_with_total( CHANNEL_CAPACITY, &channel_metrics.tx_batch_maker, &channel_metrics.tx_batch_maker_total, @@ -476,14 +472,6 @@ impl Worker { }) .unwrap_or(address); - let tx_server_handle = TxServer::spawn( - address.clone(), - shutdown_receivers.pop().unwrap(), - endpoint_metrics, - tx_batch_maker, - validator, - ); - // The transactions are sent to the `BatchMaker` that assembles them into batches. It then broadcasts // (in a reliable manner) the batches to all other workers that share the same `id` as us. Finally, it // gathers the 'cancel handlers' of the messages and send them to the `QuorumWaiter`. @@ -518,6 +506,6 @@ impl Worker { self.id, address ); - vec![batch_maker_handle, quorum_waiter_handle, tx_server_handle] + vec![batch_maker_handle, quorum_waiter_handle] } }