Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(networking): add relay connectivity loop #1482

Merged
merged 4 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/all_tests_v2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import
./v2/test_waku_filter,
./v2/test_wakunode_filter,
./v2/test_waku_peer_exchange,
./v2/test_peer_store_extended,
./v2/test_waku_payload,
./v2/test_waku_swap,
./v2/test_utils_peers,
Expand Down
28 changes: 27 additions & 1 deletion tests/v2/test_peer_store_extended.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p1] = Connected
peerStore[DisconnectBook][p1] = 0
peerStore[SourceBook][p1] = Discv5
peerStore[DirectionBook][p1] = Inbound

# Peer2: Connected
peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()]
Expand All @@ -54,6 +55,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p2] = Connected
peerStore[DisconnectBook][p2] = 0
peerStore[SourceBook][p2] = Discv5
peerStore[DirectionBook][p2] = Inbound

# Peer3: Connected
peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
Expand All @@ -64,6 +66,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p3] = Connected
peerStore[DisconnectBook][p3] = 0
peerStore[SourceBook][p3] = Discv5
peerStore[DirectionBook][p3] = Inbound

# Peer4: Added but never connected
peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()]
Expand All @@ -74,6 +77,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p4] = NotConnected
peerStore[DisconnectBook][p4] = 0
peerStore[SourceBook][p4] = Discv5
peerStore[DirectionBook][p4] = Inbound

# Peer5: Connecteed in the past
peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()]
Expand All @@ -84,6 +88,7 @@ suite "Extended nim-libp2p Peer Store":
peerStore[ConnectionBook][p5] = CanConnect
peerStore[DisconnectBook][p5] = 1000
peerStore[SourceBook][p5] = Discv5
peerStore[DirectionBook][p5] = Outbound

test "get() returns the correct StoredInfo for a given PeerId":
# When
Expand Down Expand Up @@ -113,7 +118,7 @@ suite "Extended nim-libp2p Peer Store":
storedInfoPeer6.protoVersion == ""
storedInfoPeer6.connectedness == NotConnected
storedInfoPeer6.disconnectTime == 0
storedInfoPeer6.origin == Unknown
storedInfoPeer6.origin == UnknownOrigin

test "peers() returns all StoredInfo of the PeerStore":
# When
Expand Down Expand Up @@ -254,3 +259,24 @@ suite "Extended nim-libp2p Peer Store":
swapPeer.isSome()
swapPeer.get().peerId == p5
swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]

test "getPeersByDirection()":
# When
let inPeers = peerStore.getPeersByDirection(Inbound)
let outPeers = peerStore.getPeersByDirection(Outbound)

# Then
check:
inPeers.len == 4
outPeers.len == 1

test "getNotConnectedPeers()":
# When
let disconnedtedPeers = peerStore.getNotConnectedPeers()

# Then
check:
disconnedtedPeers.len == 2
disconnedtedPeers.anyIt(it.peerId == p4)
disconnedtedPeers.anyIt(it.peerId == p5)
not disconnedtedPeers.anyIt(it.connectedness == Connected)
56 changes: 48 additions & 8 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ else:


import
std/[options, sets, sequtils, times],
std/[options, sets, sequtils, times, random],
chronos,
chronicles,
metrics,
libp2p/multistream
import
../../utils/peers,
../../waku/v2/protocol/waku_relay,
./peer_store/peer_storage,
./waku_peer_store

Expand All @@ -36,6 +37,12 @@ const
# TODO: Make configurable
DefaultDialTimeout = chronos.seconds(10)

# limit the amount of paralel dials
MaxParalelDials = 10

# delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(30)

####################
# Helper functions #
####################
Expand Down Expand Up @@ -179,19 +186,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Do not attempt to manage our unmanageable self
return

debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto

# ...known addresses
for multiaddr in remotePeerInfo.addrs:
pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr

# ...public key
var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey)

if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you abstract these bracket access statements (this one and the ones below) with procs? For example:

Suggested change
if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
if pm.addressBook.getByPeerId(remotePeerInfo.peerId) == remotePeerInfo.addrs and

Note that addressBook is a "property": a getter proc, called without parenthesis, returning the field and has a noun name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mm I kind of agree, but we would need quite a lot of boilerplate code to wrap this, one new proc per "book". One of the cool things of nimlibp2p extended peerstore, is to expand their original peerstore with just peerStore[whatever].

since it implies some modifications that doesnt per se add functionality, would like to leave it like that by now.

pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey:
# Peer already managed
return
Comment on lines +193 to +196
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For readability reasons, this complex if statement condition should be replaced by a pm.addressBook.isPeerManaged() proc returning a boolean. If you do that, the comment on line 189 will be unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just tried to implement that but then i realized that then you need to extractPublicKey twice, because its not a field in the remotePeerInfo. unless im missing something.

proc pm.addressBook.isPeerManaged(remotePeerInfo): bool = 
    var publicKey: PublicKey
    discard remotePeerInfo.peerId.extractPublicKey(publicKey)

    if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and
     pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey:
    # Peer already managed
    return true

Why thats not part of remotePeerInfo? well perhaps it should, but beyond this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why thats not part of remotePeerInfo? well perhaps it should, but beyond this PR.

🤷🏼

If I were working in Waku's peer management, I would modify the RemotePeerInfo type and add it. This is a fundamental issue that should be fixed if it helps improve code readability and reduces the cognitive load of the code.

As a temporary solution until the remote peer type includes the public key, I don't see any problem with extracting the key twice.


debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, proto = proto

pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey

# nim-libp2p identify overrides this
# TODO: Remove this once service slots is ready
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto

# Add peer to storage. Entry will subsequently be updated with connectedness information
Expand Down Expand Up @@ -284,3 +293,34 @@ proc connectToNodes*(pm: PeerManager,
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
# later.
await sleepAsync(chronos.seconds(5))

# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
while true:

let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers

# TODO: Enforce a given in/out peers ratio

# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
await sleepAsync(ConnectivityLoopInterval)
continue

# TODO: Respect backoff before attempting to connect a relay peer
var disconnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: notConnectedPeers

shuffle(disconnectedPeers)

let numPeersToConnect = min(min(maxConnections - numConPeers, disconnectedPeers.len), MaxParalelDials)

info "Relay connectivity loop",
connectedPeers = numConPeers,
targetConnectedPeers = maxConnections,
availableDisconnectedPeers = disconnectedPeers.len
Comment on lines +319 to +322
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this to debug level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imho this log is quite important, and its not spammy. quite common in blockchains to have this constant feedback on connected peers. Also useful to quickly see if there are any problems, lets say that i don't have enough peers, well looking into availableDisconnectedPeers might help. And always good to see the difference between my target amount of peers and the ones that are actually connected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

One observation about "quite common in blockchains to have this constant feedback on connected peers.":

Logs, exceptions, and other "out of the code execution flow" elements are part of an application's public-facing API. Good API design states that a library module, like the Waku node, should not make assumptions about this matter.

A blockchain node is an application, not a library.


await pm.connectToNodes(disconnectedPeers[0..<numPeersToConnect], WakuRelayCodec)

await sleepAsync(ConnectivityLoopInterval)
5 changes: 4 additions & 1 deletion waku/v2/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,7 @@ proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
return none(RemotePeerInfo)

proc getPeersByDirection*(peerStore: PeerStore, direction: Direction): seq[StoredInfo] =
return peerStore.peers().filterIt(it.direction == direction)
return peerStore.peers.filterIt(it.direction == direction)

proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness != Connected)
15 changes: 6 additions & 9 deletions waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ proc startRelay*(node: WakuNode) {.async.} =
protocolMatcher(WakuRelayCodec),
backoffPeriod)

# Maintain relay connections
asyncSpawn node.peerManager.relayConnectivityLoop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we should avoid calling asyncSpawn within the Waku node module. All asyncSpawn methods should be spawned by the application (all at the same place), in our case, the wakunode2 module.

Doing it that way will make it simpler to understand how different scheduled tasks interact and compete for CPU time, as there will be no unseen tasks launched.

I know this is not new, and there are more asyncSpawn scattered within the protocols and the Waku node module code, but it is in our hands to make the codebase debuggable and maintainable. And understanding where coroutines are spawned makes the code debuggable and maintainable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeap, agree. adding it to the todo list.


# Start the WakuRelay protocol
await node.wakuRelay.start()

Expand Down Expand Up @@ -897,9 +900,6 @@ proc startKeepalive*(node: WakuNode) =

asyncSpawn node.keepaliveLoop(defaultKeepalive)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case for this statement, for example, an asyncSpawn within the Waku node module.


# TODO: Decouple discovery logic from connection logic
# A discovered peer goes to the PeerStore
# The PeerManager uses to PeerStore to dial peers
proc runDiscv5Loop(node: WakuNode) {.async.} =
## Continuously add newly discovered nodes
## using Node Discovery v5
Expand All @@ -920,12 +920,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =

trace "Discovered peers", count=discoveredPeers.get().len()

let newPeers = discoveredPeers.get().filterIt(
not node.switch.isConnected(it.peerId))

if newPeers.len > 0:
debug "Connecting to newly discovered peers", count=newPeers.len()
await node.connectToNodes(newPeers, "discv5")
for peer in discoveredPeers.get():
# TODO: proto: WakuRelayCodec will be removed from add peer
node.peerManager.addPeer(peer, WakuRelayCodec)

# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
Expand Down