Skip to content

Commit bfc4e13

Browse files
committed
Fix relay peer management & logs
1 parent 4d85d9b commit bfc4e13

File tree

2 files changed

+60
-35
lines changed

2 files changed

+60
-35
lines changed

waku/node/peer_manager/peer_manager.nim

+57-35
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,14 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO
132132
peerId = remotePeerInfo.peerId,
133133
addresses = remotePeerInfo.addrs,
134134
origin = origin,
135-
protocols = remotePeerinfo.protocols,
136135
enr = remotePeerInfo.enr
137136

138137
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
139138
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
140139
pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin
141140

142-
if (let enr = remotePeerInfo.enr.get(); remotePeerInfo.enr.isSome()):
143-
pm.peerStore[ENRBook][remotePeerInfo.peerId] = enr
141+
if remotePeerInfo.enr.isSome():
142+
pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get()
144143

145144
# Add peer to storage. Entry will subsequently be updated with connectedness information
146145
if not pm.storage.isNil:
@@ -327,24 +326,38 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
327326
return
328327

329328
let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec)
329+
let conn = res.valueOr:
330+
info "disconnecting from peer", peerId=peerId, reason="dial failed: " & error.msg
331+
asyncSpawn(pm.switch.disconnect(peerId))
332+
pm.peerStore.delete(peerId)
333+
return
334+
335+
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
336+
info "disconnecting from peer", peerId=peerId, reason="waku metatdata request failed: " & error
337+
asyncSpawn(pm.switch.disconnect(peerId))
338+
pm.peerStore.delete(peerId)
339+
return
330340

331-
let reason =
332-
if (let conn = res; conn.isOk()):
333-
if (let metadata = (await pm.wakuMetadata.request(conn.get())); metadata.isOk()):
334-
if (let clusterId = metadata.get().clusterId; clusterId.isSome()):
335-
if pm.wakuMetadata.clusterId == clusterId.get():
336-
if metadata.get().shards.anyIt(pm.wakuMetadata.shards.contains(it)):
337-
return
338-
else: "no shards in common"
339-
else: "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
340-
else: "empty clusterId reported"
341-
else: "failed waku metadata codec request"
342-
else: "waku metadata codec not supported"
343-
344-
info "disconnecting from peer", peerId=peerId, reason=reason
345-
asyncSpawn(pm.switch.disconnect(peerId))
346-
pm.peerStore.delete(peerId)
341+
let clusterId = metadata.clusterId.valueOr:
342+
info "disconnecting from peer", peerId=peerId, reason="empty clusterId reported"
343+
asyncSpawn(pm.switch.disconnect(peerId))
344+
pm.peerStore.delete(peerId)
345+
return
346+
347+
if pm.wakuMetadata.clusterId != clusterId:
348+
info "disconnecting from peer",
349+
peerId=peerId,
350+
reason="different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
351+
asyncSpawn(pm.switch.disconnect(peerId))
352+
pm.peerStore.delete(peerId)
353+
return
347354

355+
if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)):
356+
info "disconnecting from peer", peerId=peerId, reason="no shard in common"
357+
asyncSpawn(pm.switch.disconnect(peerId))
358+
pm.peerStore.delete(peerId)
359+
return
360+
348361
# called when a peer i) first connects to us ii) disconnects all connections from us
349362
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
350363
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
@@ -586,9 +599,10 @@ proc connectToNodes*(pm: PeerManager,
586599
# later.
587600
await sleepAsync(chronos.seconds(5))
588601

589-
# Returns the peerIds of physical connections (in and out)
590-
# containing at least one stream with the given protocol.
591602
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
603+
## Returns the peerIds of physical connections (in and out)
604+
## containing at least one stream with the given protocol.
605+
592606
var inPeers: seq[PeerId]
593607
var outPeers: seq[PeerId]
594608

@@ -631,20 +645,22 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
631645
var peersToConnect: seq[RemotePeerInfo]
632646
var peersToDisconnect: int
633647

634-
for shard in pm.wakuMetadata.shards.items:
635-
var peers = pm.peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard))
648+
# Get all connected peers for Waku Relay
649+
var (inPeers, outPeers) = pm.connectedPeers(WakuRelayCodec)
636650

637-
peers.keepItIf(it.protocols.contains(WakuRelayCodec))
651+
# Calculate in/out target number of peers for each shards
652+
let inTarget = pm.inRelayPeersTarget div pm.wakuMetadata.shards.len
653+
let outTarget = pm.outRelayPeersTarget div pm.wakuMetadata.shards.len
638654

639-
let connectedInPeers = peers.filterIt(
640-
pm.peerStore.isConnected(it.peerId) and it.direction == Inbound)
641-
642-
let connectedOutPeers = peers.filterIt(
643-
pm.peerStore.isConnected(it.peerId) and it.direction == Outbound)
655+
for shard in pm.wakuMetadata.shards.items:
656+
# Filter out peer not on this shard
657+
let connectedInPeers = inPeers.filterIt(
658+
pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)))
644659

645-
let inTarget = pm.inRelayPeersTarget div pm.wakuMetadata.shards.len
646-
let outTarget = pm.outRelayPeersTarget div pm.wakuMetadata.shards.len
660+
let connectedOutPeers = outPeers.filterIt(
661+
pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)))
647662

663+
# Calculate the difference between current values and targets
648664
let inPeerDiff = connectedInPeers.len - inTarget
649665
let outPeerDiff = outTarget - connectedOutPeers.len
650666

@@ -654,12 +670,17 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
654670
if outPeerDiff <= 0:
655671
continue
656672

657-
let connectablePeers = peers.filterIt(
658-
(not pm.peerStore.isConnected(it.peerId)) and pm.canBeConnected(it.peerId))
673+
# Get all connectable relay peers for this shard
674+
var connectablePeers = pm.peerStore.getPeersByShard(
675+
uint16(pm.wakuMetadata.clusterId), uint16(shard))
676+
677+
connectablePeers.keepItIf(
678+
it.protocols.contains(WakuRelayCodec) and
679+
not pm.peerStore.isConnected(it.peerId) and
680+
pm.canBeConnected(it.peerId))
659681

660682
debug "Target Peer Count per Shard",
661-
Shard = shard,
662-
RelayPeers = peers.len,
683+
ShardNumber = shard,
663684
Inbound = $connectedInPeers.len & "/" & $inTarget,
664685
Outbound = $connectedOutPeers.len & "/" & $outTarget,
665686
Connectable = connectablePeers.len
@@ -675,6 +696,7 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
675696
# Even with duplicates, after a couple of iteration the target will be reached
676697
let uniquePeers = peersToConnect.deduplicate()
677698

699+
# Connect to all nodes
678700
for i in countup(0, uniquePeers.len, MaxParallelDials):
679701
let stop = min(i + MaxParallelDials, uniquePeers.len)
680702
await pm.connectToNodes(uniquePeers[i..<stop])

waku/node/peer_manager/waku_peer_store.nim

+3
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
102102
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
103103
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
104104

105+
proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool =
106+
peerStore[ENRBook][peerId].containsShard(cluster, shard)
107+
105108
proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
106109
# Returns `true` if the peer is connected
107110
peerStore.connectedness(peerId) == Connected

0 commit comments

Comments
 (0)