Skip to content

Commit

Permalink
Merge pull request #124 from n0-computer/feat-faster-dht-provider
Browse files Browse the repository at this point in the history
feat: improve provider fetching
  • Loading branch information
dignifiedquire authored Jun 16, 2022
2 parents 652b7ff + 96514d3 commit feeff29
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 51 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ members = [

[patch.crates-io]
# TODO: switch to crates.io once 0.45 is released
libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" }
libp2p = { git = "https://github.com/dignifiedquire/rust-libp2p", branch = "feat-kad-count" }
# libp2p = { path = "../rust-libp2p" }
20 changes: 17 additions & 3 deletions iroh-bitswap/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use libp2p::core::connection::ConnectionId;
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::handler::OneShotHandler;
use libp2p::swarm::{
IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use prometheus_client::registry::Registry;
use tracing::{debug, instrument, warn};
use tracing::{debug, instrument, trace, warn};

use crate::message::{BitswapMessage, Priority};
use crate::protocol::{BitswapProtocol, Upgrade};
Expand Down Expand Up @@ -206,6 +206,20 @@ impl NetworkBehaviour for Bitswap {
}
}

#[instrument(skip(self, _handler))]
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_handler: Self::ConnectionHandler,
_error: &DialError,
) {
trace!("failed to dial");
if let Some(ref peer_id) = peer_id {
self.sessions.dial_failure(peer_id);
self.queries.dial_failure(peer_id);
}
}

#[instrument(skip(self))]
fn inject_event(&mut self, peer_id: PeerId, connection: ConnectionId, message: HandlerEvent) {
match message {
Expand Down Expand Up @@ -254,7 +268,7 @@ impl NetworkBehaviour for Bitswap {
.push_back(NetworkBehaviourAction::GenerateEvent(event));
}

// TODO: cancle Query::Send
// TODO: cancel Query::Send

// Propagate Cancel Events
for cid in message.wantlist().cancels() {
Expand Down
4 changes: 4 additions & 0 deletions iroh-bitswap/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ impl QueryManager {
}
}

pub fn dial_failure(&mut self, peer_id: &PeerId) {
self.disconnected(peer_id);
}

fn next_finished_query(&mut self) -> Option<(QueryId, Query)> {
let mut next_query = None;
for (query_id, query) in &self.queries {
Expand Down
28 changes: 23 additions & 5 deletions iroh-bitswap/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use libp2p::{
},
PeerId,
};
use tracing::{debug, trace};
use tracing::trace;

use crate::{behaviour::BitswapHandler, query::QueryManager, BitswapEvent};

Expand Down Expand Up @@ -73,6 +73,12 @@ impl SessionManager {
self.sessions.remove(peer_id);
}

pub fn dial_failure(&mut self, peer_id: &PeerId) {
if let Some(session) = self.sessions.get_mut(peer_id) {
session.state = State::Disconnected;
}
}

pub fn create_session(&mut self, peer_id: &PeerId) {
let session = self.sessions.entry(*peer_id).or_insert(Session {
state: State::New,
Expand All @@ -99,8 +105,13 @@ impl SessionManager {
queries: &mut QueryManager,
) -> Option<NetworkBehaviourAction<BitswapEvent, BitswapHandler>> {
// cleanup disconnects
self.sessions
.retain(|_, s| !matches!(s.state, State::Disconnected));
self.sessions.retain(|_id, s| {
if matches!(s.state, State::Disconnected) {
return false;
}

true
});

// limit parallel dials
let skip_dialing =
Expand All @@ -117,7 +128,7 @@ impl SessionManager {
// no dialing this round
continue;
}
trace!("Dialing {}", peer_id);
trace!("dialing {}", peer_id);
let handler = Default::default();
session.state = State::Dialing(Instant::now());

Expand All @@ -134,13 +145,20 @@ impl SessionManager {
State::Dialing(start) => {
// check for dial timeouts
if start.elapsed() >= self.config.dial_timeout {
debug!("dialing {}: timed out", peer_id);
trace!("dialing {}: timed out", peer_id);
queries.disconnected(peer_id);
session.state = State::Disconnected;
}
}
State::Connected => {
if let Some(event) = queries.poll_peer(peer_id) {
if let NetworkBehaviourAction::GenerateEvent(
BitswapEvent::OutboundQueryCompleted { .. },
) = event
{
session.query_count -= 1;
}

return Some(event);
}
}
Expand Down
21 changes: 17 additions & 4 deletions iroh-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,18 @@ impl NodeBehaviour {
.into();

let kad = if config.kademlia {
let local_peer_id = local_key.public().to_peer_id();
let pub_key = local_key.public();

// TODO: persist to store
let store = MemoryStore::new(local_peer_id.to_owned());
let store = MemoryStore::new(pub_key.to_peer_id());

// TODO: make user configurable
let mut kad_config = KademliaConfig::default();
kad_config.set_parallelism(16usize.try_into().unwrap());
// TODO: potentially lower (this is per query)
kad_config.set_query_timeout(Duration::from_secs(5));
kad_config.set_query_timeout(Duration::from_secs(60));

let mut kademlia = Kademlia::with_config(local_peer_id, store, kad_config);
let mut kademlia = Kademlia::with_config(pub_key.to_peer_id(), store, kad_config);
for multiaddr in &config.bootstrap_peers {
// TODO: move parsing into config
let mut addr = multiaddr.to_owned();
Expand All @@ -113,9 +115,12 @@ impl NodeBehaviour {
warn!("Could not parse bootstrap addr {}", multiaddr);
}
}

// Trigger initial bootstrap
if let Err(e) = kademlia.bootstrap() {
warn!("Kademlia bootstrap failed: {}", e);
}

Some(kademlia)
} else {
None
Expand Down Expand Up @@ -163,4 +168,12 @@ impl NodeBehaviour {
kad.add_address(peer, addr);
}
}

pub fn finish_query(&mut self, id: &libp2p::kad::QueryId) {
if let Some(kad) = self.kad.as_mut() {
if let Some(mut query) = kad.query_mut(id) {
query.finish();
}
}
}
}
2 changes: 1 addition & 1 deletion iroh-p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Default for Libp2pConfig {
bootstrap_peers,
mdns: false,
kademlia: true,
target_peer_count: 75,
target_peer_count: 256,
rpc_addr: "0.0.0.0:4401".parse().unwrap(),
rpc_client: RpcClientConfig::default(),
metrics: MetricsConfig::default(),
Expand Down
30 changes: 21 additions & 9 deletions iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use futures::channel::oneshot;
use libp2p::kad::record::Key;
use libp2p::Multiaddr;
use libp2p::PeerId;
use tokio::sync::mpsc;
use tonic::{transport::Server as TonicServer, Request, Response, Status};
use tracing::trace;
use tracing::{trace, warn};

use iroh_bitswap::{Block, QueryError};
use iroh_rpc_types::p2p::p2p_server;
Expand Down Expand Up @@ -84,22 +85,33 @@ impl p2p_server::P2p for P2p {
iroh_metrics::req::set_trace_ctx(&request);
let req = request.into_inner();
trace!("received ProviderRequest: {:?}", req.key);
let (s, r) = oneshot::channel();
let (s, mut r) = mpsc::channel(1024);
let msg = RpcMessage::ProviderRequest {
key: req.key.into(),
key: req.key.clone().into(),
response_channel: s,
};

self.sender
.send(msg)
.await
.map_err(|_| Status::internal("receiver dropped"))?;

let providers = r
.await
.map_err(|_| Status::internal("sender dropped"))?
.map_err(|e| Status::internal(format!("failed to retrieve provider: {:?}", e)))?;
// TODO: streaming response
let mut providers = Vec::new();
while let Some(provider) = r.recv().await {
match provider {
Ok(provider) => providers.push(provider.to_bytes()),
Err(e) => {
if providers.is_empty() {
return Err(Status::internal(e));
} else {
warn!("error fetching providers for key {:?}: {:?}", req.key, e);
break;
}
}
}
}

let providers = providers.into_iter().map(|p| p.to_bytes()).collect();
Ok(Response::new(Providers { providers }))
}

Expand Down Expand Up @@ -219,7 +231,7 @@ pub enum RpcMessage {
ProviderRequest {
// TODO: potentially change this to Cid, as that is the only key we use for providers
key: Key,
response_channel: oneshot::Sender<Result<HashSet<PeerId>, String>>,
response_channel: mpsc::Sender<Result<PeerId, String>>,
},
NetListeningAddrs(oneshot::Sender<(PeerId, Vec<Multiaddr>)>),
NetPeers(oneshot::Sender<HashMap<PeerId, Vec<Multiaddr>>>),
Expand Down
Loading

0 comments on commit feeff29

Please sign in to comment.