Skip to content

Commit bb92436

Browse files
committed
refactor: modify reconnectPeers and unittest
1 parent 80c3114 commit bb92436

File tree

2 files changed

+50
-11
lines changed

2 files changed

+50
-11
lines changed

tests/v2/test_peer_manager.nim

+45
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import
55
stew/shims/net as stewNet,
66
testutils/unittests,
77
chronicles,
8+
chronos,
89
json_rpc/rpcserver,
910
json_rpc/rpcclient,
1011
eth/keys,
@@ -269,3 +270,47 @@ procSuite "Peer Manager":
269270
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
270271

271272
await allFutures([node1.stop(), node2.stop(), node3.stop()])
273+
274+
asyncTest "Peer manager connects to all peers supporting a given protocol":
275+
# Create 4 nodes
276+
var nodes: seq[WakuNode]
277+
for i in 0..<4:
278+
let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
279+
let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60860 + i))
280+
nodes &= node
281+
282+
# Start them
283+
await allFutures(nodes.mapIt(it.start()))
284+
await allFutures(nodes.mapIt(it.mountRelay()))
285+
286+
# Get all peer infos
287+
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
288+
289+
# Add all peers (but self) to node 0
290+
nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec)
291+
nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec)
292+
nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec)
293+
294+
# Attempt to connect to all known peers supporting a given protocol
295+
await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec))
296+
297+
check:
298+
# Peerstore track all three peers
299+
nodes[0].peerManager.peerStore.peers().len == 3
300+
301+
# All peer ids are correct
302+
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId)
303+
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId)
304+
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId)
305+
306+
# All peers support the relay protocol
307+
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec)
308+
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec)
309+
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec)
310+
311+
# All peers are connected
312+
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected
313+
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected
314+
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected
315+
316+
await allFutures(nodes.mapIt(it.stop()))

waku/v2/node/peer_manager/peer_manager.nim

+5-11
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
158158
# Manager interface #
159159
#####################
160160

161-
# TODO: Move to peer store and unit test
162161
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
163162
# Adds peer to manager for the specified protocol
164163

@@ -195,31 +194,26 @@ proc reconnectPeers*(pm: PeerManager,
195194
debug "Reconnecting peers", proto=proto
196195

197196
for storedInfo in pm.peerStore.peers(protocolMatcher):
198-
# Check that peer exists and can be connected
199-
if storedInfo.peerId notin pm.peerStore[ConnectionBook] or
200-
pm.peerStore[ConnectionBook][storedInfo.peerId] == CannotConnect:
197+
# Check that the peer can be connected
198+
if storedInfo.connection == CannotConnect:
201199
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
202200
continue
203201

204202
# Respect optional backoff period where applicable.
205203
let
206-
disconnectTime = Moment.init(pm.peerStore[DisconnectBook][storedInfo.peerId], Second) # Convert
204+
# TODO: Add method to peerStore (eg isBackoffExpired())
205+
disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert
207206
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
208207
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
209208

210209
trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime
211210

211+
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
212212
if backoffTime > ZeroDuration:
213213
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
214214
# We disconnected recently and still need to wait for a backoff period before connecting
215215
await sleepAsync(backoffTime)
216216

217-
# Add to protos for peer, if it has not been added yet
218-
if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto):
219-
let remotePeerInfo = storedInfo.toRemotePeerInfo()
220-
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
221-
pm.addPeer(remotePeerInfo, proto)
222-
223217
trace "Reconnecting to peer", peerId=storedInfo.peerId
224218
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
225219

0 commit comments

Comments
 (0)