Skip to content

Commit 1cf3a80

Browse files
committed
sharded peer manager
Fix possible out of bound & logic error Filter peers per protocol & rename proc Fix out of bound & refactor dialling Fix catching raise VS timeout & tests fixes Fix test to connect to all peer per proto Fix test Div target per shard count Logging & stuff Fixes Log peer count More logs Remove protobook override & clean up Fix relay peer management & logs Mics Fixes Fixes
1 parent bebaa59 commit 1cf3a80

File tree

6 files changed

+269
-157
lines changed

6 files changed

+269
-157
lines changed

tests/test_peer_manager.nim

+50-18
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{.used.}
22

33
import
4-
std/[options, sequtils, times],
4+
std/[options, sequtils, times, sugar],
55
stew/shims/net as stewNet,
66
testutils/unittests,
77
chronos,
@@ -21,6 +21,7 @@ import
2121
../../waku/node/peer_manager/peer_manager,
2222
../../waku/node/peer_manager/peer_store/waku_peer_storage,
2323
../../waku/waku_node,
24+
../../waku/waku_core/topics,
2425
../../waku/waku_relay,
2526
../../waku/waku_store,
2627
../../waku/waku_filter,
@@ -128,7 +129,6 @@ procSuite "Peer Manager":
128129

129130
await node.stop()
130131

131-
132132
asyncTest "Peer manager keeps track of connections":
133133
# Create 2 nodes
134134
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
@@ -225,18 +225,36 @@ procSuite "Peer Manager":
225225
let
226226
database = SqliteDatabase.new(":memory:")[]
227227
storage = WakuPeerStorage.new(database)[]
228-
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
229-
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
230-
peerInfo2 = node2.switch.peerInfo
228+
node1 = newTestWakuNode(
229+
generateSecp256k1Key(),
230+
ValidIpAddress.init("127.0.0.1"),
231+
Port(44048),
232+
peerStorage = storage
233+
)
234+
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023))
235+
236+
node1.mountMetadata(0).expect("Mounted Waku Metadata")
237+
node2.mountMetadata(0).expect("Mounted Waku Metadata")
231238

232239
await node1.start()
233240
await node2.start()
234241

235242
await node1.mountRelay()
236243
await node2.mountRelay()
244+
245+
let peerInfo2 = node2.switch.peerInfo
246+
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
247+
remotePeerInfo2.enr = some(node2.enr)
237248

238-
require:
239-
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
249+
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
250+
assert is12Connected == true, "Node 1 and 2 not connected"
251+
252+
# When node use 0.0.0.0 and port 0
253+
# After connecting the peer store is updated with the wrong address
254+
check:
255+
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs
256+
257+
# wait for the peer store update
240258
await sleepAsync(chronos.milliseconds(500))
241259

242260
check:
@@ -246,24 +264,34 @@ procSuite "Peer Manager":
246264
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
247265

248266
# Simulate restart by initialising a new node using the same storage
249-
let
250-
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
267+
let node3 = newTestWakuNode(
268+
generateSecp256k1Key(),
269+
ValidIpAddress.init("127.0.0.1"),
270+
Port(56037),
271+
peerStorage = storage
272+
)
273+
274+
node3.mountMetadata(0).expect("Mounted Waku Metadata")
251275

252276
await node3.start()
277+
253278
check:
254279
# Node2 has been loaded after "restart", but we have not yet reconnected
255280
node3.peerManager.peerStore.peers().len == 1
256281
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
257282
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
258283

259284
await node3.mountRelay()
260-
await node3.peerManager.connectToRelayPeers()
285+
286+
await node3.peerManager.manageRelayPeers()
287+
288+
await sleepAsync(chronos.milliseconds(500))
261289

262290
check:
263291
# Reconnected to node2 after "restart"
264292
node3.peerManager.peerStore.peers().len == 1
265293
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
266-
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
294+
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
267295

268296
await allFutures([node1.stop(), node2.stop(), node3.stop()])
269297

@@ -297,9 +325,9 @@ procSuite "Peer Manager":
297325
topics = @["/waku/2/rs/4/0"],
298326
)
299327

300-
discard node1.mountMetadata(clusterId3)
301-
discard node2.mountMetadata(clusterId4)
302-
discard node3.mountMetadata(clusterId4)
328+
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
329+
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
330+
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
303331

304332
# Start nodes
305333
await allFutures([node1.start(), node2.start(), node3.start()])
@@ -318,7 +346,6 @@ procSuite "Peer Manager":
318346
conn2.isNone
319347
conn3.isSome
320348

321-
322349
# TODO: nwaku/issues/1377
323350
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
324351
let
@@ -380,19 +407,24 @@ procSuite "Peer Manager":
380407
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
381408

382409
# Start them
383-
await allFutures(nodes.mapIt(it.start()))
410+
discard nodes.mapIt(it.mountMetadata(0))
384411
await allFutures(nodes.mapIt(it.mountRelay()))
412+
await allFutures(nodes.mapIt(it.start()))
385413

386414
# Get all peer infos
387-
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
415+
let peerInfos = collect:
416+
for i in 0..nodes.high:
417+
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
418+
peerInfo.enr = some(nodes[i].enr)
419+
peerInfo
388420

389421
# Add all peers (but self) to node 0
390422
nodes[0].peerManager.addPeer(peerInfos[1])
391423
nodes[0].peerManager.addPeer(peerInfos[2])
392424
nodes[0].peerManager.addPeer(peerInfos[3])
393425

394426
# Connect to relay peers
395-
await nodes[0].peerManager.connectToRelayPeers()
427+
await nodes[0].peerManager.manageRelayPeers()
396428

397429
check:
398430
# Peerstore track all three peers

tests/testlib/wakunode.nim

+3-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
6666
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
6767
else: extPort
6868

69-
let conf = defaultTestWakuNodeConf()
69+
var conf = defaultTestWakuNodeConf()
70+
71+
conf.topics = topics
7072

7173
if dns4DomainName.isSome() and extIp.isNone():
7274
# If there's an error resolving the IP, an exception is thrown and test fails

0 commit comments

Comments
 (0)