Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Kademlia: Speed-up the record fetching #13081

Merged
merged 1 commit into from
Jan 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 50 additions & 25 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl DiscoveryConfig {
NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
.expect("value is a constant; constant is non-zero; qed."),
),
outbound_query_records: Vec::new(),
records_to_publish: Default::default(),
}
}
}
Expand Down Expand Up @@ -287,8 +287,12 @@ pub struct DiscoveryBehaviour {
allow_non_globals_in_dht: bool,
/// A cache of discovered external addresses. Only used for logging purposes.
known_external_addresses: LruHashSet<Multiaddr>,
/// A cache of outbound query records.
outbound_query_records: Vec<(record::Key, Vec<u8>)>,
/// Records to publish per QueryId.
///
/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
/// to these peers.
records_to_publish: HashMap<QueryId, Record>,
}

impl DiscoveryBehaviour {
Expand Down Expand Up @@ -692,33 +696,54 @@ impl NetworkBehaviour for DiscoveryBehaviour {
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetRecord(res),
stats,
step,
id,
..
} => {
let ev = match res {
Ok(ok) =>
if let GetRecordOk::FoundRecord(r) = ok {
self.outbound_query_records
.push((r.record.key, r.record.value));
Ok(GetRecordOk::FoundRecord(r)) => {
debug!(
target: "sub-libp2p",
"Libp2p => Found record ({:?}) with value: {:?}",
r.record.key,
r.record.value,
);

// Let's directly finish the query, as we are only interested in a
// quorum of 1.
if let Some(kad) = self.kademlia.as_mut() {
if let Some(mut query) = kad.query_mut(&id) {
query.finish();
}
}

self.records_to_publish.insert(id, r.record.clone());

DiscoveryOut::ValueFound(
vec![(r.record.key, r.record.value)],
stats.duration().unwrap_or_default(),
)
},
Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
cache_candidates,
}) => {
if cache_candidates.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkchr A bit late to this party, but should we not call self.records_to_publish.remove(&id) here?
I think if cache_candidates is empty, we could still have added something to records_to_publish for this query id which would not be cleared then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙈

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue
} else {
debug!(
target: "sub-libp2p",
"Libp2p => Query progressed to {:?} step (last: {:?})",
step.count,
step.last,
);
if step.last {
let records =
self.outbound_query_records.drain(..).collect();
DiscoveryOut::ValueFound(
records,
stats.duration().unwrap_or_default(),
)
} else {
continue
}

// Put the record to the `cache_candidates` that are nearest to the
// record key from our point of view of the network.
if let Some(record) = self.records_to_publish.remove(&id) {
if let Some(kad) = self.kademlia.as_mut() {
kad.put_record_to(
record,
cache_candidates.into_iter().map(|v| v.1),
Quorum::One,
);
}
},
}

continue
},
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(
target: "sub-libp2p",
Expand Down