-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(networking): add relay connectivity loop #1482
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,14 @@ else: | |
|
||
|
||
import | ||
std/[options, sets, sequtils, times], | ||
std/[options, sets, sequtils, times, random], | ||
chronos, | ||
chronicles, | ||
metrics, | ||
libp2p/multistream | ||
import | ||
../../utils/peers, | ||
../../waku/v2/protocol/waku_relay, | ||
./peer_store/peer_storage, | ||
./waku_peer_store | ||
|
||
|
@@ -36,6 +37,12 @@ const | |
# TODO: Make configurable | ||
DefaultDialTimeout = chronos.seconds(10) | ||
|
||
# limit the amount of paralel dials | ||
MaxParalelDials = 10 | ||
|
||
# delay between consecutive relayConnectivityLoop runs | ||
ConnectivityLoopInterval = chronos.seconds(30) | ||
|
||
#################### | ||
# Helper functions # | ||
#################### | ||
|
@@ -179,19 +186,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = | |
# Do not attempt to manage our unmanageable self | ||
return | ||
|
||
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto | ||
|
||
# ...known addresses | ||
for multiaddr in remotePeerInfo.addrs: | ||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr | ||
|
||
# ...public key | ||
var publicKey: PublicKey | ||
discard remotePeerInfo.peerId.extractPublicKey(publicKey) | ||
|
||
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and | ||
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey: | ||
# Peer already managed | ||
return | ||
Comment on lines
+193
to
+196
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For readability reasons, this complex if statement condition should be replaced by a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just tried to implement that but then i realized that then you need to proc pm.addressBook.isPeerManaged(remotePeerInfo): bool =
var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey:
# Peer already managed
return true Why thats not part of remotePeerInfo? well perhaps it should, but beyond this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
🤷🏼 If I were working in Waku's peer management, I would modify the As a temporary solution until the remote peer type includes the public key, I don't see any problem with extracting the key twice. |
||
|
||
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, proto = proto | ||
|
||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs | ||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey | ||
|
||
# nim-libp2p identify overrides this | ||
# TODO: Remove this once service slots is ready | ||
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto | ||
|
||
# Add peer to storage. Entry will subsequently be updated with connectedness information | ||
|
@@ -284,3 +293,34 @@ proc connectToNodes*(pm: PeerManager, | |
# This issue was known to Dmitiry on nim-libp2p and may be resolvable | ||
# later. | ||
await sleepAsync(chronos.seconds(5)) | ||
|
||
# Ensures a healthy amount of connected relay peers | ||
proc relayConnectivityLoop*(pm: PeerManager) {.async.} = | ||
alrevuelta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
while true: | ||
|
||
let maxConnections = pm.switch.connManager.inSema.size | ||
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len | ||
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len | ||
let numConPeers = numInPeers + numOutPeers | ||
|
||
# TODO: Enforce a given in/out peers ratio | ||
|
||
# Leave some room for service peers | ||
if numConPeers >= (maxConnections - 5): | ||
await sleepAsync(ConnectivityLoopInterval) | ||
continue | ||
|
||
# TODO: Respect backoff before attempting to connect a relay peer | ||
var disconnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: |
||
shuffle(disconnectedPeers) | ||
|
||
let numPeersToConnect = min(min(maxConnections - numConPeers, disconnectedPeers.len), MaxParalelDials) | ||
|
||
info "Relay connectivity loop", | ||
connectedPeers = numConPeers, | ||
targetConnectedPeers = maxConnections, | ||
availableDisconnectedPeers = disconnectedPeers.len | ||
Comment on lines
+319
to
+322
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we move this to debug level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. imho this log is quite important, and its not spammy. quite common in blockchains to have this constant feedback on connected peers. Also useful to quickly see if there are any problems, lets say that i don't have enough peers, well looking into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. One observation about "quite common in blockchains to have this constant feedback on connected peers.":
A blockchain node is an application, not a library. |
||
|
||
await pm.connectToNodes(disconnectedPeers[0..<numPeersToConnect], WakuRelayCodec) | ||
alrevuelta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
await sleepAsync(ConnectivityLoopInterval) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -392,6 +392,9 @@ proc startRelay*(node: WakuNode) {.async.} = | |
protocolMatcher(WakuRelayCodec), | ||
backoffPeriod) | ||
|
||
# Maintain relay connections | ||
asyncSpawn node.peerManager.relayConnectivityLoop() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, we should avoid calling Doing it that way will make it simpler to understand how different scheduled tasks interact and compete for CPU time, as there will be no unseen tasks launched. I know this is not new, and there are more There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeap, agree. adding it to the todo list. |
||
|
||
# Start the WakuRelay protocol | ||
await node.wakuRelay.start() | ||
|
||
|
@@ -897,9 +900,6 @@ proc startKeepalive*(node: WakuNode) = | |
|
||
asyncSpawn node.keepaliveLoop(defaultKeepalive) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the case for this statement, for example, an |
||
|
||
# TODO: Decouple discovery logic from connection logic | ||
# A discovered peer goes to the PeerStore | ||
# The PeerManager uses to PeerStore to dial peers | ||
proc runDiscv5Loop(node: WakuNode) {.async.} = | ||
## Continuously add newly discovered nodes | ||
## using Node Discovery v5 | ||
|
@@ -920,12 +920,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} = | |
|
||
trace "Discovered peers", count=discoveredPeers.get().len() | ||
|
||
let newPeers = discoveredPeers.get().filterIt( | ||
not node.switch.isConnected(it.peerId)) | ||
|
||
if newPeers.len > 0: | ||
debug "Connecting to newly discovered peers", count=newPeers.len() | ||
await node.connectToNodes(newPeers, "discv5") | ||
for peer in discoveredPeers.get(): | ||
# TODO: proto: WakuRelayCodec will be removed from add peer | ||
node.peerManager.addPeer(peer, WakuRelayCodec) | ||
|
||
# Discovery `queryRandom` can have a synchronous fast path for example | ||
# when no peers are in the routing table. Don't run it in continuous loop. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you abstract these bracket access statements (this one and the ones below) with procs? For example:
Note that
addressBook
is a "property": a getterproc
, called without parenthesis, returning the field and has a noun name.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mm I kind of agree, but we would need quite a lot of boilerplate code to wrap this, one new proc per "book". One of the cool things of nimlibp2p extended peerstore, is to expand their original peerstore with just
peerStore[whatever]
.since it implies some modifications that doesnt per se add functionality, would like to leave it like that by now.