Skip to content

Commit ff4c433

Browse files
committedNov 16, 2023
chore: mics. improvements to cluster id and shards setup
1 parent a5da1fc commit ff4c433

File tree

10 files changed

+278
-256
lines changed

10 files changed

+278
-256
lines changed
 

‎apps/wakunode2/app.nim

+52-20
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,21 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
127127
quit(QuitFailure)
128128
else: recordRes.get()
129129

130+
# Check the ENR sharding info for matching config cluster id
131+
if conf.clusterId != 0:
132+
let res = record.toTyped()
133+
if res.isErr():
134+
error "ENR setup failed", error = $res.get()
135+
quit(QuitFailure)
136+
137+
let relayShard = res.get().relaySharding().valueOr:
138+
error "no sharding info"
139+
quit(QuitFailure)
140+
141+
if conf.clusterId != relayShard.clusterId:
142+
error "cluster id mismatch"
143+
quit(QuitFailure)
144+
130145
App(
131146
version: git_version,
132147
conf: conf,
@@ -234,7 +249,13 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
234249
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
235250
)
236251

237-
WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record))
252+
WakuDiscoveryV5.new(
253+
app.rng,
254+
discv5Conf,
255+
some(app.record),
256+
some(app.node.peerManager),
257+
app.node.topicSubscriptionQueue,
258+
)
238259

239260
## Init waku node instance
240261

@@ -286,18 +307,17 @@ proc initNode(conf: WakuNodeConf,
286307
ok(node)
287308

288309
proc setupWakuApp*(app: var App): AppResult[void] =
289-
290-
## Discv5
291-
if app.conf.discv5Discovery:
292-
app.wakuDiscV5 = some(app.setupDiscoveryV5())
293-
294310
## Waku node
295311
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
296312
if initNodeRes.isErr():
297313
return err("failed to init node: " & initNodeRes.error)
298314

299315
app.node = initNodeRes.get()
300316

317+
## Discv5
318+
if app.conf.discv5Discovery:
319+
app.wakuDiscV5 = some(app.setupDiscoveryV5())
320+
301321
ok()
302322

303323
proc getPorts(listenAddrs: seq[MultiAddress]):
@@ -341,7 +361,17 @@ proc updateNetConfig(app: var App): AppResult[void] =
341361
proc updateEnr(app: var App): AppResult[void] =
342362

343363
let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr:
344-
return err(error)
364+
return err("ENR setup failed: " & error)
365+
366+
if app.conf.clusterId != 0:
367+
let tRecord = record.toTyped().valueOr:
368+
return err("ENR setup failed: " & $error)
369+
370+
let relayShard = tRecord.relaySharding().valueOr:
371+
return err("ENR setup failed: no sharding info")
372+
373+
if app.conf.clusterId != relayShard.clusterId:
374+
return err("ENR setup failed: cluster id mismatch")
345375

346376
app.record = record
347377
app.node.enr = record
@@ -377,6 +407,9 @@ proc setupProtocols(node: WakuNode,
377407
## Optionally include persistent message storage.
378408
## No protocols are started yet.
379409

410+
node.mountMetadata(conf.clusterId).isOkOr:
411+
return err("failed to mount waku metadata protocol: " & error)
412+
380413
# Mount relay on all nodes
381414
var peerExchangeHandler = none(RoutingRecordsHandler)
382415
if conf.relayPeerExchange:
@@ -587,25 +620,24 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
587620

588621
proc startApp*(app: var App): AppResult[void] =
589622

590-
try:
591-
(waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)).isOkOr:
592-
return err(error)
593-
except CatchableError:
594-
return err("exception starting node: " & getCurrentExceptionMsg())
623+
let nodeRes = catch: (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes))
624+
if nodeRes.isErr():
625+
return err("exception starting node: " & nodeRes.error.msg)
626+
627+
nodeRes.get().isOkOr:
628+
return err("exception starting node: " & error)
595629

596630
# Update app data that is set dynamically on node start
597631
app.updateApp().isOkOr:
598632
return err("Error in updateApp: " & $error)
599633

600-
if app.wakuDiscv5.isSome():
601-
let wakuDiscv5 = app.wakuDiscv5.get()
602-
603-
let res = wakuDiscv5.start()
604-
if res.isErr():
605-
return err("failed to start waku discovery v5: " & $res.error)
634+
if (let wakuDiscv5 = app.wakuDiscv5.get(); app.wakuDiscv5.isSome()):
635+
let catchRes = catch: (waitFor wakuDiscv5.start())
636+
let startRes = catchRes.valueOr:
637+
return err("failed to start waku discovery v5: " & catchRes.error.msg)
606638

607-
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager)
608-
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)
639+
startRes.isOkOr:
640+
return err("failed to start waku discovery v5: " & error)
609641

610642
return ok()
611643

‎examples/publisher.nim

+17-16
Original file line numberDiff line numberDiff line change
@@ -51,31 +51,32 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
5151
var bootstrapNodeEnr: enr.Record
5252
discard bootstrapNodeEnr.fromURI(bootstrapNode)
5353

54+
let discv5Conf = WakuDiscoveryV5Config(
55+
discv5Config: none(DiscoveryConfig),
56+
address: ip,
57+
port: Port(discv5Port),
58+
privateKey: keys.PrivateKey(nodeKey.skkey),
59+
bootstrapRecords: @[bootstrapNodeEnr],
60+
autoupdateRecord: true,
61+
)
62+
5463
# assumes behind a firewall, so not care about being discoverable
5564
let wakuDiscv5 = WakuDiscoveryV5.new(
56-
extIp= none(ValidIpAddress),
57-
extTcpPort = none(Port),
58-
extUdpPort = none(Port),
59-
bindIP = ip,
60-
discv5UdpPort = Port(discv5Port),
61-
bootstrapEnrs = @[bootstrapNodeEnr],
62-
privateKey = keys.PrivateKey(nodeKey.skkey),
63-
flags = flags,
64-
rng = node.rng,
65-
topics = @[],
66-
)
65+
node.rng,
66+
discv5Conf,
67+
some(node.enr),
68+
some(node.peerManager),
69+
node.topicSubscriptionQueue,
70+
)
6771

6872
await node.start()
6973
await node.mountRelay()
7074
node.peerManager.start()
7175

72-
let discv5Res = wakuDiscv5.start()
73-
if discv5Res.isErr():
74-
error "failed to start discv5", error= discv5Res.error
76+
(await wakuDiscv5.start()).isOkOr:
77+
error "failed to start discv5", error = error
7578
quit(1)
7679

77-
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)
78-
7980
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
8081
while true:
8182
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)

‎examples/subscriber.nim

+17-16
Original file line numberDiff line numberDiff line change
@@ -46,31 +46,32 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
4646
var bootstrapNodeEnr: enr.Record
4747
discard bootstrapNodeEnr.fromURI(bootstrapNode)
4848

49+
let discv5Conf = WakuDiscoveryV5Config(
50+
discv5Config: none(DiscoveryConfig),
51+
address: ip,
52+
port: Port(discv5Port),
53+
privateKey: keys.PrivateKey(nodeKey.skkey),
54+
bootstrapRecords: @[bootstrapNodeEnr],
55+
autoupdateRecord: true,
56+
)
57+
4958
# assumes behind a firewall, so not care about being discoverable
5059
let wakuDiscv5 = WakuDiscoveryV5.new(
51-
extIp= none(ValidIpAddress),
52-
extTcpPort = none(Port),
53-
extUdpPort = none(Port),
54-
bindIP = ip,
55-
discv5UdpPort = Port(discv5Port),
56-
bootstrapEnrs = @[bootstrapNodeEnr],
57-
privateKey = keys.PrivateKey(nodeKey.skkey),
58-
flags = flags,
59-
rng = node.rng,
60-
topics = @[],
61-
)
60+
node.rng,
61+
discv5Conf,
62+
some(node.enr),
63+
some(node.peerManager),
64+
node.topicSubscriptionQueue,
65+
)
6266

6367
await node.start()
6468
await node.mountRelay()
6569
node.peerManager.start()
6670

67-
let discv5Res = wakuDiscv5.start()
68-
if discv5Res.isErr():
69-
error "failed to start discv5", error = discv5Res.error
71+
(await wakuDiscv5.start()).isOkOr:
72+
error "failed to start discv5", error = error
7073
quit(1)
7174

72-
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)
73-
7475
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
7576
while true:
7677
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)

‎tests/test_peer_manager.nim

+27-5
Original file line numberDiff line numberDiff line change
@@ -268,16 +268,38 @@ procSuite "Peer Manager":
268268
await allFutures([node1.stop(), node2.stop(), node3.stop()])
269269

270270
asyncTest "Peer manager drops conections to peers on different networks":
271-
let clusterId1 = 1.uint32
272-
let clusterId2 = 2.uint32
271+
let clusterId3 = 3.uint32
272+
let clusterId4 = 4.uint32
273273

274274
let
275275
# different network
276-
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1)
276+
node1 = newTestWakuNode(
277+
generateSecp256k1Key(),
278+
ValidIpAddress.init("0.0.0.0"),
279+
Port(0),
280+
clusterId = clusterId3,
281+
topics = @["/waku/2/rs/3/0"],
282+
)
277283

278284
# same network
279-
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
280-
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
285+
node2 = newTestWakuNode(
286+
generateSecp256k1Key(),
287+
ValidIpAddress.init("0.0.0.0"),
288+
Port(0),
289+
clusterId = clusterId4,
290+
topics = @["/waku/2/rs/4/0"],
291+
)
292+
node3 = newTestWakuNode(
293+
generateSecp256k1Key(),
294+
ValidIpAddress.init("0.0.0.0"),
295+
Port(0),
296+
clusterId = clusterId4,
297+
topics = @["/waku/2/rs/4/0"],
298+
)
299+
300+
discard node1.mountMetadata(clusterId3)
301+
discard node2.mountMetadata(clusterId4)
302+
discard node3.mountMetadata(clusterId4)
281303

282304
# Start nodes
283305
await allFutures([node1.start(), node2.start(), node3.start()])

‎tests/test_waku_discv5.nim

+26-21
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,26 @@ proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey,
3434
builder.build().tryGet()
3535

3636

37-
proc newTestDiscv5(privKey: libp2p_keys.PrivateKey,
38-
bindIp: string, tcpPort: uint16, udpPort: uint16,
39-
record: waku_enr.Record,
40-
bootstrapRecords = newSeq[waku_enr.Record]()): WakuDiscoveryV5 =
37+
proc newTestDiscv5(
38+
privKey: libp2p_keys.PrivateKey,
39+
bindIp: string, tcpPort: uint16, udpPort: uint16,
40+
record: waku_enr.Record,
41+
bootstrapRecords = newSeq[waku_enr.Record](),
42+
queue = newAsyncEventQueue[SubscriptionEvent](30),
43+
): WakuDiscoveryV5 =
4144
let config = WakuDiscoveryV5Config(
4245
privateKey: eth_keys.PrivateKey(privKey.skkey),
4346
address: ValidIpAddress.init(bindIp),
4447
port: Port(udpPort),
4548
bootstrapRecords: bootstrapRecords,
4649
)
4750

48-
let discv5 = WakuDiscoveryV5.new(rng(), config, some(record))
51+
let discv5 = WakuDiscoveryV5.new(
52+
rng = rng(),
53+
conf = config,
54+
record = some(record),
55+
queue = queue,
56+
)
4957

5058
return discv5
5159

@@ -122,13 +130,13 @@ procSuite "Waku Discovery v5":
122130
bootstrapRecords = @[record1, record2]
123131
)
124132

125-
let res1 = node1.start()
133+
let res1 = await node1.start()
126134
assert res1.isOk(), res1.error
127135

128-
let res2 = node2.start()
136+
let res2 = await node2.start()
129137
assert res2.isOk(), res2.error
130138

131-
let res3 = node3.start()
139+
let res3 = await node3.start()
132140
assert res3.isOk(), res3.error
133141

134142
## When
@@ -240,16 +248,16 @@ procSuite "Waku Discovery v5":
240248
)
241249

242250
# Start nodes' discoveryV5 protocols
243-
let res1 = node1.start()
251+
let res1 = await node1.start()
244252
assert res1.isOk(), res1.error
245253

246-
let res2 = node2.start()
254+
let res2 = await node2.start()
247255
assert res2.isOk(), res2.error
248256

249-
let res3 = node3.start()
257+
let res3 = await node3.start()
250258
assert res3.isOk(), res3.error
251259

252-
let res4 = node4.start()
260+
let res4 = await node4.start()
253261
assert res4.isOk(), res4.error
254262

255263
## Given
@@ -401,22 +409,20 @@ procSuite "Waku Discovery v5":
401409
udpPort = udpPort,
402410
)
403411

412+
let queue = newAsyncEventQueue[SubscriptionEvent](30)
413+
404414
let node = newTestDiscv5(
405415
privKey = privKey,
406416
bindIp = bindIp,
407417
tcpPort = tcpPort,
408418
udpPort = udpPort,
409-
record = record
419+
record = record,
420+
queue = queue,
410421
)
411422

412-
let res = node.start()
423+
let res = await node.start()
413424
assert res.isOk(), res.error
414425

415-
let queue = newAsyncEventQueue[SubscriptionEvent](0)
416-
417-
## When
418-
asyncSpawn node.subscriptionsListener(queue)
419-
420426
## Then
421427
queue.emit((kind: PubsubSub, topic: shard1))
422428
queue.emit((kind: PubsubSub, topic: shard2))
@@ -442,14 +448,13 @@ procSuite "Waku Discovery v5":
442448

443449
queue.emit((kind: PubsubUnsub, topic: shard1))
444450
queue.emit((kind: PubsubUnsub, topic: shard2))
445-
queue.emit((kind: PubsubUnsub, topic: shard3))
446451

447452
await sleepAsync(1.seconds)
448453

449454
check:
450455
node.protocol.localNode.record.containsShard(shard1) == false
451456
node.protocol.localNode.record.containsShard(shard2) == false
452-
node.protocol.localNode.record.containsShard(shard3) == false
457+
node.protocol.localNode.record.containsShard(shard3) == true
453458

454459
## Cleanup
455460
await node.stop()

0 commit comments

Comments
 (0)