Skip to content

Commit 8b9a09b

Browse files
Revert "feat: shard aware peer management (#2151)"
This reverts commit dba9820. We need to revert this commit because the waku-simulator stopped working. i.e. the nodes couldn't establish connections among them: https://github.com/waku-org/waku-simulator/tree/054ba9e33f4fdcdb590bcfe760a5254069c5cb9f Also, the following js-waku test fails due to this commit: "same cluster, different shard: nodes connect"
1 parent a1b27ed commit 8b9a09b

File tree

17 files changed

+216
-344
lines changed

17 files changed

+216
-344
lines changed

apps/chat2/chat2.nim

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ when (NimMajor, NimMinor) < (1, 4):
99
else:
1010
{.push raises: [].}
1111

12-
import std/[strformat, strutils, times, options, random]
12+
import std/[strformat, strutils, times, json, options, random]
1313
import confutils, chronicles, chronos, stew/shims/net as stewNet,
1414
eth/keys, bearssl, stew/[byteutils, results],
15+
nimcrypto/pbkdf2,
1516
metrics,
1617
metrics/chronos_httpserver
1718
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
@@ -21,10 +22,11 @@ import libp2p/[switch, # manage transports, a single entry poi
2122
peerinfo, # manage the information of a peer, such as peer ID and public / private key
2223
peerid, # Implement how peers interact
2324
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
25+
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
2426
nameresolving/dnsresolver]# define DNS resolution
2527
import
2628
../../waku/waku_core,
27-
../../waku/waku_lightpush/common,
29+
../../waku/waku_lightpush,
2830
../../waku/waku_lightpush/rpc,
2931
../../waku/waku_filter,
3032
../../waku/waku_enr,

apps/wakunode2/app.nim

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import
5353
../../waku/waku_peer_exchange,
5454
../../waku/waku_rln_relay,
5555
../../waku/waku_store,
56-
../../waku/waku_lightpush/common,
56+
../../waku/waku_lightpush,
5757
../../waku/waku_filter,
5858
../../waku/waku_filter_v2,
5959
./wakunode2_validator_signed,

tests/test_peer_manager.nim

+39-79
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{.used.}
22

33
import
4-
std/[options, sequtils, times, sugar],
4+
std/[options, sequtils, times],
55
stew/shims/net as stewNet,
66
testutils/unittests,
77
chronos,
@@ -21,12 +21,10 @@ import
2121
../../waku/node/peer_manager/peer_manager,
2222
../../waku/node/peer_manager/peer_store/waku_peer_storage,
2323
../../waku/waku_node,
24-
../../waku/waku_core,
25-
../../waku/waku_enr/capabilities,
26-
../../waku/waku_relay/protocol,
27-
../../waku/waku_store/common,
28-
../../waku/waku_filter/protocol,
29-
../../waku/waku_lightpush/common,
24+
../../waku/waku_relay,
25+
../../waku/waku_store,
26+
../../waku/waku_filter,
27+
../../waku/waku_lightpush,
3028
../../waku/waku_peer_exchange,
3129
../../waku/waku_metadata,
3230
./testlib/common,
@@ -37,7 +35,7 @@ import
3735
procSuite "Peer Manager":
3836
asyncTest "connectRelay() works":
3937
# Create 2 nodes
40-
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
38+
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
4139
await allFutures(nodes.mapIt(it.start()))
4240

4341
let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
@@ -50,7 +48,7 @@ procSuite "Peer Manager":
5048

5149
asyncTest "dialPeer() works":
5250
# Create 2 nodes
53-
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
51+
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
5452

5553
await allFutures(nodes.mapIt(it.start()))
5654
await allFutures(nodes.mapIt(it.mountRelay()))
@@ -78,7 +76,7 @@ procSuite "Peer Manager":
7876

7977
asyncTest "dialPeer() fails gracefully":
8078
# Create 2 nodes and start them
81-
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
79+
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
8280
await allFutures(nodes.mapIt(it.start()))
8381
await allFutures(nodes.mapIt(it.mountRelay()))
8482

@@ -101,7 +99,7 @@ procSuite "Peer Manager":
10199

102100
asyncTest "Adding, selecting and filtering peers work":
103101
let
104-
node = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
102+
node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
105103

106104
# Create filter peer
107105
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
@@ -130,9 +128,10 @@ procSuite "Peer Manager":
130128

131129
await node.stop()
132130

131+
133132
asyncTest "Peer manager keeps track of connections":
134133
# Create 2 nodes
135-
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
134+
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
136135

137136
await allFutures(nodes.mapIt(it.start()))
138137
await allFutures(nodes.mapIt(it.mountRelay()))
@@ -176,7 +175,7 @@ procSuite "Peer Manager":
176175

177176
asyncTest "Peer manager updates failed peers correctly":
178177
# Create 2 nodes
179-
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
178+
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
180179

181180
await allFutures(nodes.mapIt(it.start()))
182181
await allFutures(nodes.mapIt(it.mountRelay()))
@@ -226,34 +225,18 @@ procSuite "Peer Manager":
226225
let
227226
database = SqliteDatabase.new(":memory:")[]
228227
storage = WakuPeerStorage.new(database)[]
229-
node1 = newTestWakuNode(
230-
generateSecp256k1Key(),
231-
parseIpAddress("127.0.0.1"),
232-
Port(44048),
233-
peerStorage = storage
234-
)
235-
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(34023))
236-
237-
node1.mountMetadata(0).expect("Mounted Waku Metadata")
238-
node2.mountMetadata(0).expect("Mounted Waku Metadata")
228+
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
229+
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
230+
peerInfo2 = node2.switch.peerInfo
239231

240232
await node1.start()
241233
await node2.start()
242234

243235
await node1.mountRelay()
244236
await node2.mountRelay()
245-
246-
let peerInfo2 = node2.switch.peerInfo
247-
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
248-
remotePeerInfo2.enr = some(node2.enr)
249-
250-
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
251-
assert is12Connected == true, "Node 1 and 2 not connected"
252237

253-
check:
254-
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs
255-
256-
# wait for the peer store update
238+
require:
239+
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
257240
await sleepAsync(chronos.milliseconds(500))
258241

259242
check:
@@ -263,28 +246,18 @@ procSuite "Peer Manager":
263246
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
264247

265248
# Simulate restart by initialising a new node using the same storage
266-
let node3 = newTestWakuNode(
267-
generateSecp256k1Key(),
268-
parseIpAddress("127.0.0.1"),
269-
Port(56037),
270-
peerStorage = storage
271-
)
272-
273-
node3.mountMetadata(0).expect("Mounted Waku Metadata")
249+
let
250+
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
274251

275252
await node3.start()
276-
277253
check:
278254
# Node2 has been loaded after "restart", but we have not yet reconnected
279255
node3.peerManager.peerStore.peers().len == 1
280256
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
281257
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
282258

283259
await node3.mountRelay()
284-
285-
await node3.peerManager.manageRelayPeers()
286-
287-
await sleepAsync(chronos.milliseconds(500))
260+
await node3.peerManager.connectToRelayPeers()
288261

289262
check:
290263
# Reconnected to node2 after "restart"
@@ -302,7 +275,7 @@ procSuite "Peer Manager":
302275
# different network
303276
node1 = newTestWakuNode(
304277
generateSecp256k1Key(),
305-
parseIpAddress("0.0.0.0"),
278+
ValidIpAddress.init("0.0.0.0"),
306279
Port(0),
307280
clusterId = clusterId3,
308281
topics = @["/waku/2/rs/3/0"],
@@ -311,22 +284,22 @@ procSuite "Peer Manager":
311284
# same network
312285
node2 = newTestWakuNode(
313286
generateSecp256k1Key(),
314-
parseIpAddress("0.0.0.0"),
287+
ValidIpAddress.init("0.0.0.0"),
315288
Port(0),
316289
clusterId = clusterId4,
317290
topics = @["/waku/2/rs/4/0"],
318291
)
319292
node3 = newTestWakuNode(
320293
generateSecp256k1Key(),
321-
parseIpAddress("0.0.0.0"),
294+
ValidIpAddress.init("0.0.0.0"),
322295
Port(0),
323296
clusterId = clusterId4,
324297
topics = @["/waku/2/rs/4/0"],
325298
)
326299

327-
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
328-
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
329-
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
300+
discard node1.mountMetadata(clusterId3)
301+
discard node2.mountMetadata(clusterId4)
302+
discard node3.mountMetadata(clusterId4)
330303

331304
# Start nodes
332305
await allFutures([node1.start(), node2.start(), node3.start()])
@@ -345,13 +318,14 @@ procSuite "Peer Manager":
345318
conn2.isNone
346319
conn3.isSome
347320

321+
348322
# TODO: nwaku/issues/1377
349323
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
350324
let
351325
database = SqliteDatabase.new(":memory:")[]
352326
storage = WakuPeerStorage.new(database)[]
353-
node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), peerStorage = storage)
354-
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
327+
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
328+
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
355329
peerInfo2 = node2.switch.peerInfo
356330
betaCodec = "/vac/waku/relay/2.0.0-beta2"
357331
stableCodec = "/vac/waku/relay/2.0.0"
@@ -375,7 +349,7 @@ procSuite "Peer Manager":
375349

376350
# Simulate restart by initialising a new node using the same storage
377351
let
378-
node3 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), peerStorage = storage)
352+
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
379353

380354
await node3.mountRelay()
381355
node3.wakuRelay.codec = stableCodec
@@ -403,36 +377,22 @@ procSuite "Peer Manager":
403377

404378
asyncTest "Peer manager connects to all peers supporting a given protocol":
405379
# Create 4 nodes
406-
let nodes =
407-
toSeq(0..<4)
408-
.mapIt(
409-
newTestWakuNode(
410-
nodeKey = generateSecp256k1Key(),
411-
bindIp = parseIpAddress("0.0.0.0"),
412-
bindPort = Port(0),
413-
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
414-
)
415-
)
380+
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
416381

417382
# Start them
418-
discard nodes.mapIt(it.mountMetadata(0))
419-
await allFutures(nodes.mapIt(it.mountRelay()))
420383
await allFutures(nodes.mapIt(it.start()))
384+
await allFutures(nodes.mapIt(it.mountRelay()))
421385

422386
# Get all peer infos
423-
let peerInfos = collect:
424-
for i in 0..nodes.high:
425-
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
426-
peerInfo.enr = some(nodes[i].enr)
427-
peerInfo
387+
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
428388

429389
# Add all peers (but self) to node 0
430390
nodes[0].peerManager.addPeer(peerInfos[1])
431391
nodes[0].peerManager.addPeer(peerInfos[2])
432392
nodes[0].peerManager.addPeer(peerInfos[3])
433393

434394
# Connect to relay peers
435-
await nodes[0].peerManager.manageRelayPeers()
395+
await nodes[0].peerManager.connectToRelayPeers()
436396

437397
check:
438398
# Peerstore track all three peers
@@ -457,7 +417,7 @@ procSuite "Peer Manager":
457417

458418
asyncTest "Peer store keeps track of incoming connections":
459419
# Create 4 nodes
460-
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
420+
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
461421

462422
# Start them
463423
await allFutures(nodes.mapIt(it.start()))
@@ -520,7 +480,7 @@ procSuite "Peer Manager":
520480
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
521481

522482
let
523-
node = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
483+
node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
524484
peers = toSeq(1..5)
525485
.mapIt(
526486
parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it)
@@ -562,7 +522,7 @@ procSuite "Peer Manager":
562522

563523
asyncTest "connectedPeers() returns expected number of connections per protocol":
564524
# Create 4 nodes
565-
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
525+
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
566526

567527
# Start them with relay + filter
568528
await allFutures(nodes.mapIt(it.start()))
@@ -613,7 +573,7 @@ procSuite "Peer Manager":
613573

614574
asyncTest "getNumStreams() returns expected number of connections per protocol":
615575
# Create 2 nodes
616-
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
576+
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
617577

618578
# Start them with relay + filter
619579
await allFutures(nodes.mapIt(it.start()))
@@ -839,7 +799,7 @@ procSuite "Peer Manager":
839799

840800
asyncTest "colocationLimit is enforced by pruneConnsByIp()":
841801
# Create 5 nodes
842-
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
802+
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
843803

844804
# Start them with relay + filter
845805
await allFutures(nodes.mapIt(it.start()))

tests/test_waku_lightpush.nim

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import
1111
../../waku/node/peer_manager,
1212
../../waku/waku_core,
1313
../../waku/waku_lightpush,
14-
../../waku/waku_lightpush/common,
1514
../../waku/waku_lightpush/client,
1615
../../waku/waku_lightpush/protocol_metrics,
1716
../../waku/waku_lightpush/rpc,

tests/test_wakunode_lightpush.nim

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ import
44
std/options,
55
stew/shims/net as stewNet,
66
testutils/unittests,
7-
chronos
7+
chronicles,
8+
chronos,
9+
libp2p/crypto/crypto,
10+
libp2p/switch
811
import
912
../../waku/waku_core,
10-
../../waku/waku_lightpush/common,
13+
../../waku/waku_lightpush,
1114
../../waku/node/peer_manager,
1215
../../waku/waku_node,
16+
./testlib/common,
1317
./testlib/wakucore,
1418
./testlib/wakunode
1519

tests/testlib/wakunode.nim

+4-8
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
3232
dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
3333
nat: "any",
3434
maxConnections: 50,
35-
clusterId: 1.uint32,
36-
topics: @["/waku/2/rs/1/0"],
35+
topics: @[],
3736
relay: true
3837
)
3938

@@ -56,8 +55,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
5655
dns4DomainName = none(string),
5756
discv5UdpPort = none(Port),
5857
agentString = none(string),
59-
clusterId: uint32 = 1.uint32,
60-
topics: seq[string] = @["/waku/2/rs/1/0"],
58+
clusterId: uint32 = 2.uint32,
59+
topics: seq[string] = @["/waku/2/rs/2/0"],
6160
peerStoreCapacity = none(int)): WakuNode =
6261

6362
var resolvedExtIp = extIp
@@ -67,10 +66,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
6766
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
6867
else: extPort
6968

70-
var conf = defaultTestWakuNodeConf()
71-
72-
conf.clusterId = clusterId
73-
conf.topics = topics
69+
let conf = defaultTestWakuNodeConf()
7470

7571
if dns4DomainName.isSome() and extIp.isNone():
7672
# If there's an error resolving the IP, an exception is thrown and test fails

0 commit comments

Comments
 (0)