diff --git a/Cargo.toml b/Cargo.toml index 4298ede054..51c73bd127 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,4 +18,5 @@ members = [ [patch.crates-io] # TODO: switch to crates.io once 0.45 is released -libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } +libp2p = { git = "https://github.com/dignifiedquire/rust-libp2p", branch = "feat-kad-count" } +# libp2p = { path = "../rust-libp2p" } diff --git a/iroh-bitswap/src/behaviour.rs b/iroh-bitswap/src/behaviour.rs index e23c578672..1fc7b0993a 100644 --- a/iroh-bitswap/src/behaviour.rs +++ b/iroh-bitswap/src/behaviour.rs @@ -12,10 +12,10 @@ use libp2p::core::connection::ConnectionId; use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p::swarm::handler::OneShotHandler; use libp2p::swarm::{ - IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use prometheus_client::registry::Registry; -use tracing::{debug, instrument, warn}; +use tracing::{debug, instrument, trace, warn}; use crate::message::{BitswapMessage, Priority}; use crate::protocol::{BitswapProtocol, Upgrade}; @@ -206,6 +206,20 @@ impl NetworkBehaviour for Bitswap { } } + #[instrument(skip(self, _handler))] + fn inject_dial_failure( + &mut self, + peer_id: Option, + _handler: Self::ConnectionHandler, + _error: &DialError, + ) { + trace!("failed to dial"); + if let Some(ref peer_id) = peer_id { + self.sessions.dial_failure(peer_id); + self.queries.dial_failure(peer_id); + } + } + #[instrument(skip(self))] fn inject_event(&mut self, peer_id: PeerId, connection: ConnectionId, message: HandlerEvent) { match message { @@ -254,7 +268,7 @@ impl NetworkBehaviour for Bitswap { .push_back(NetworkBehaviourAction::GenerateEvent(event)); } - // TODO: cancle Query::Send + // TODO: cancel Query::Send // Propagate Cancel Events for cid in message.wantlist().cancels() { diff --git a/iroh-bitswap/src/query.rs b/iroh-bitswap/src/query.rs index 13d53a7a28..b30cc36499 100644 --- a/iroh-bitswap/src/query.rs +++ b/iroh-bitswap/src/query.rs @@ -182,6 +182,10 @@ impl QueryManager { } } + pub fn dial_failure(&mut self, peer_id: &PeerId) { + self.disconnected(peer_id); + } + fn next_finished_query(&mut self) -> Option<(QueryId, Query)> { let mut next_query = None; for (query_id, query) in &self.queries { diff --git a/iroh-bitswap/src/session.rs b/iroh-bitswap/src/session.rs index 099df56c47..00c2774e72 100644 --- a/iroh-bitswap/src/session.rs +++ b/iroh-bitswap/src/session.rs @@ -11,7 +11,7 @@ use libp2p::{ }, PeerId, }; -use tracing::{debug, trace}; +use tracing::trace; use crate::{behaviour::BitswapHandler, query::QueryManager, BitswapEvent}; @@ -73,6 +73,12 @@ impl SessionManager { self.sessions.remove(peer_id); } + pub fn dial_failure(&mut self, peer_id: &PeerId) { + if let Some(session) = self.sessions.get_mut(peer_id) { + session.state = State::Disconnected; + } + } + pub fn create_session(&mut self, peer_id: &PeerId) { let session = self.sessions.entry(*peer_id).or_insert(Session { state: State::New, @@ -99,8 +105,13 @@ impl SessionManager { queries: &mut QueryManager, ) -> Option> { // cleanup disconnects - self.sessions - .retain(|_, s| !matches!(s.state, State::Disconnected)); + self.sessions.retain(|_id, s| { + if matches!(s.state, State::Disconnected) { + return false; + } + + true + }); // limit parallel dials let skip_dialing = @@ -117,7 +128,7 @@ impl SessionManager { // no dialing this round continue; } - trace!("Dialing {}", peer_id); + trace!("dialing {}", peer_id); let handler = Default::default(); session.state = State::Dialing(Instant::now()); @@ -134,13 +145,20 @@ impl SessionManager { State::Dialing(start) => { // check for dial timeouts if start.elapsed() >= self.config.dial_timeout { - debug!("dialing {}: timed out", peer_id); + trace!("dialing {}: timed out", peer_id); queries.disconnected(peer_id); session.state = State::Disconnected; } } State::Connected => { if let Some(event) = queries.poll_peer(peer_id) { + if let NetworkBehaviourAction::GenerateEvent( + BitswapEvent::OutboundQueryCompleted { .. }, + ) = event + { + session.query_count -= 1; + } + return Some(event); } } diff --git a/iroh-p2p/src/behaviour.rs b/iroh-p2p/src/behaviour.rs index 6562fefc6b..a7485d4d80 100644 --- a/iroh-p2p/src/behaviour.rs +++ b/iroh-p2p/src/behaviour.rs @@ -93,16 +93,18 @@ impl NodeBehaviour { .into(); let kad = if config.kademlia { - let local_peer_id = local_key.public().to_peer_id(); + let pub_key = local_key.public(); + // TODO: persist to store - let store = MemoryStore::new(local_peer_id.to_owned()); + let store = MemoryStore::new(pub_key.to_peer_id()); + // TODO: make user configurable let mut kad_config = KademliaConfig::default(); kad_config.set_parallelism(16usize.try_into().unwrap()); // TODO: potentially lower (this is per query) - kad_config.set_query_timeout(Duration::from_secs(5)); + kad_config.set_query_timeout(Duration::from_secs(60)); - let mut kademlia = Kademlia::with_config(local_peer_id, store, kad_config); + let mut kademlia = Kademlia::with_config(pub_key.to_peer_id(), store, kad_config); for multiaddr in &config.bootstrap_peers { // TODO: move parsing into config let mut addr = multiaddr.to_owned(); @@ -113,9 +115,12 @@ impl NodeBehaviour { warn!("Could not parse bootstrap addr {}", multiaddr); } } + + // Trigger initial bootstrap if let Err(e) = kademlia.bootstrap() { warn!("Kademlia bootstrap failed: {}", e); } + Some(kademlia) } else { None @@ -163,4 +168,12 @@ impl NodeBehaviour { kad.add_address(peer, addr); } } + + pub fn finish_query(&mut self, id: &libp2p::kad::QueryId) { + if let Some(kad) = self.kad.as_mut() { + if let Some(mut query) = kad.query_mut(id) { + query.finish(); + } + } + } } diff --git a/iroh-p2p/src/config.rs b/iroh-p2p/src/config.rs index fbf609200e..0aed7ba52e 100644 --- a/iroh-p2p/src/config.rs +++ b/iroh-p2p/src/config.rs @@ -83,7 +83,7 @@ impl Default for Libp2pConfig { bootstrap_peers, mdns: false, kademlia: true, - target_peer_count: 75, + target_peer_count: 256, rpc_addr: "0.0.0.0:4401".parse().unwrap(), rpc_client: RpcClientConfig::default(), metrics: MetricsConfig::default(), diff --git a/iroh-p2p/src/rpc.rs b/iroh-p2p/src/rpc.rs index ad47655805..def3ba393b 100644 --- a/iroh-p2p/src/rpc.rs +++ b/iroh-p2p/src/rpc.rs @@ -9,8 +9,9 @@ use futures::channel::oneshot; use libp2p::kad::record::Key; use libp2p::Multiaddr; use libp2p::PeerId; +use tokio::sync::mpsc; use tonic::{transport::Server as TonicServer, Request, Response, Status}; -use tracing::trace; +use tracing::{trace, warn}; use iroh_bitswap::{Block, QueryError}; use iroh_rpc_types::p2p::p2p_server; @@ -84,22 +85,33 @@ impl p2p_server::P2p for P2p { iroh_metrics::req::set_trace_ctx(&request); let req = request.into_inner(); trace!("received ProviderRequest: {:?}", req.key); - let (s, r) = oneshot::channel(); + let (s, mut r) = mpsc::channel(1024); let msg = RpcMessage::ProviderRequest { - key: req.key.into(), + key: req.key.clone().into(), response_channel: s, }; + self.sender .send(msg) .await .map_err(|_| Status::internal("receiver dropped"))?; - let providers = r - .await - .map_err(|_| Status::internal("sender dropped"))? - .map_err(|e| Status::internal(format!("failed to retrieve provider: {:?}", e)))?; + // TODO: streaming response + let mut providers = Vec::new(); + while let Some(provider) = r.recv().await { + match provider { + Ok(provider) => providers.push(provider.to_bytes()), + Err(e) => { + if providers.is_empty() { + return Err(Status::internal(e)); + } else { + warn!("error fetching providers for key {:?}: {:?}", req.key, e); + break; + } + } + } + } - let providers = providers.into_iter().map(|p| p.to_bytes()).collect(); Ok(Response::new(Providers { providers })) } @@ -219,7 +231,7 @@ pub enum RpcMessage { ProviderRequest { // TODO: potentially change this to Cid, as that is the only key we use for providers key: Key, - response_channel: oneshot::Sender, String>>, + response_channel: mpsc::Sender>, }, NetListeningAddrs(oneshot::Sender<(PeerId, Vec)>), NetPeers(oneshot::Sender>>), diff --git a/iroh-p2p/src/service.rs b/iroh-p2p/src/service.rs index a782f57674..2c75b1c33a 100644 --- a/iroh-p2p/src/service.rs +++ b/iroh-p2p/src/service.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::num::NonZeroU8; use std::time::Duration; @@ -6,17 +6,19 @@ use ahash::AHashMap; use anyhow::{anyhow, Context, Result}; use async_channel::{bounded as channel, Receiver}; use cid::Cid; -use futures::channel::oneshot::{self, Sender as OneShotSender}; +use futures::channel::oneshot::Sender as OneShotSender; use futures_util::stream::StreamExt; use iroh_rpc_client::Client as RpcClient; use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::timeout::TransportTimeout; use libp2p::core::transport::Boxed; use libp2p::core::Multiaddr; pub use libp2p::gossipsub::{IdentTopic, Topic}; use libp2p::identify::{IdentifyEvent, IdentifyInfo}; use libp2p::identity::Keypair; use libp2p::kad::{ - self, record::Key, GetProvidersError, GetProvidersOk, KademliaEvent, QueryResult, + self, record::Key, GetProvidersError, GetProvidersOk, GetProvidersProgress, KademliaEvent, + QueryProgress, QueryResult, }; use libp2p::metrics::{Metrics, Recorder}; use libp2p::multiaddr::Protocol; @@ -28,7 +30,7 @@ use libp2p::swarm::{ use libp2p::yamux::WindowUpdateMode; use libp2p::{core, mplex, noise, yamux, PeerId, Swarm, Transport}; use prometheus_client::registry::Registry; -use tokio::{select, time}; +use tokio::{select, sync::mpsc, time}; use tracing::{debug, info, trace, warn}; use iroh_bitswap::{ @@ -62,7 +64,7 @@ pub struct Libp2pService { } enum QueryChannel { - GetProviders(Vec, String>>>), + GetProviders(Vec>>), } #[derive(Debug, Hash, PartialEq, Eq)] @@ -70,6 +72,8 @@ enum QueryKey { ProviderKey(Key), } +const PROVIDER_LIMIT: usize = 20; + impl Libp2pService { pub async fn new( config: Libp2pConfig, @@ -91,7 +95,7 @@ impl Libp2pService { let node = NodeBehaviour::new(&net_keypair, &config, registry).await?; let mut swarm = SwarmBuilder::new(transport, node, peer_id) .connection_limits(limits) - .notify_handler_buffer_size(std::num::NonZeroUsize::new(20).expect("Not zero")) // TODO: configurable + .notify_handler_buffer_size(20.try_into().unwrap()) // TODO: configurable .connection_event_buffer_size(128) .dial_concurrency_factor(NonZeroU8::new(16).unwrap()) .executor(Box::new(|fut| { @@ -221,35 +225,22 @@ impl Libp2pService { Event::Kademlia(e) => { self.metrics.record(&e); if let KademliaEvent::OutboundQueryCompleted { result, .. } = e { - debug!("kad: {:?}", result); + debug!("kad completed: {:?}", result); match result { - QueryResult::GetProviders(Ok(GetProvidersOk { - providers, key, .. - })) => { - if let Some(QueryChannel::GetProviders(chans)) = - self.kad_queries.remove(&QueryKey::ProviderKey(key.clone())) - { - for chan in chans.into_iter() { - debug!("Sending providers for {:?}", key); - chan.send(Ok(providers.clone())).ok(); - } - } else { - debug!("No listeners"); - } + QueryResult::GetProviders(Ok(GetProvidersOk { key, .. })) => { + let _ = self.kad_queries.remove(&QueryKey::ProviderKey(key)); } + QueryResult::GetProviders(Err(err)) => { - let (key, providers) = match err { - GetProvidersError::Timeout { key, providers, .. } => { - (key, providers) - } + let key = match err { + GetProvidersError::Timeout { key, .. } => key, }; debug!("GetProviders timeout {:?}", key); if let Some(QueryChannel::GetProviders(chans)) = - self.kad_queries.remove(&QueryKey::ProviderKey(key.clone())) + self.kad_queries.remove(&QueryKey::ProviderKey(key)) { for chan in chans.into_iter() { - debug!("Sending providers for {:?}", key); - chan.send(Ok(providers.clone())).ok(); + chan.send(Err("Timeout".into())).await.ok(); } } } @@ -257,6 +248,33 @@ impl Libp2pService { debug!("Libp2p => Unhandled Kademlia query result: {:?}", other) } } + } else if let KademliaEvent::OutboundQueryProgressed { + id, result, count, .. + } = e + { + debug!("kad progressed: {:?}", result); + match result { + QueryProgress::GetProviders(GetProvidersProgress { + key, provider, .. + }) => { + if count >= PROVIDER_LIMIT { + debug!("finish provider query {}/{}", count, PROVIDER_LIMIT); + // Finish query if we have enough providers. + self.swarm.behaviour_mut().finish_query(&id); + } + + if let Some(QueryChannel::GetProviders(chans)) = self + .kad_queries + .get_mut(&QueryKey::ProviderKey(key.clone())) + { + for chan in chans.iter_mut() { + chan.send(Ok(provider)).await.ok(); + } + } else { + debug!("No listeners"); + } + } + } } } Event::Identify(e) => { @@ -342,7 +360,10 @@ impl Libp2pService { ); } } else { - response_channel.send(Ok(Default::default())).ok(); + response_channel + .send(Err("kademlia is not available".into())) + .await + .ok(); } } RpcMessage::NetListeningAddrs(response_channel) => { @@ -407,6 +428,10 @@ pub async fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBo let transport = libp2p::websocket::WsConfig::new(libp2p::tcp::TokioTcpConfig::new().nodelay(true)) .or_transport(transport); + + // TODO: configurable + let transport = TransportTimeout::new(transport, Duration::from_secs(5)); + let transport = libp2p::dns::TokioDnsConfig::system(transport).unwrap(); let auth_config = { let dh_keys = noise::Keypair::::new() @@ -434,3 +459,59 @@ pub async fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBo .timeout(Duration::from_secs(20)) // TODO: configurable .boxed() } + +#[cfg(test)] +mod tests { + use crate::metrics; + + use super::*; + use anyhow::Result; + use libp2p::identity::ed25519; + + #[tokio::test] + async fn test_fetch_providers() -> Result<()> { + let mut prom_registry = Registry::default(); + let libp2p_metrics = Metrics::new(&mut prom_registry); + let net_keypair = { + let gen_keypair = ed25519::Keypair::generate(); + Keypair::Ed25519(gen_keypair) + }; + + let mut network_config = Libp2pConfig::default(); + network_config.metrics.debug = true; + let metrics_config = network_config.metrics.clone(); + + let mut p2p_service = Libp2pService::new( + network_config, + net_keypair, + &mut prom_registry, + libp2p_metrics, + ) + .await?; + + let metrics_handle = iroh_metrics::init_with_registry( + metrics::metrics_config_with_compile_time_info(metrics_config), + prom_registry, + ) + .await + .expect("failed to initialize metrics"); + + let cfg = iroh_rpc_client::Config::default(); + let p2p_task = tokio::task::spawn(async move { + p2p_service.run().await.unwrap(); + }); + + { + let client = RpcClient::new(&cfg).await?; + let c = "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" + .parse() + .unwrap(); + let providers = client.p2p.fetch_providers(&c).await?; + assert!(providers.len() >= PROVIDER_LIMIT); + } + + p2p_task.abort(); + metrics_handle.shutdown(); + Ok(()) + } +}