Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metadata protocol shard subscription #2149

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions tests/test_waku_metadata.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import
eth/p2p/discoveryv5/enr
import
../../waku/waku_node,
../../waku/waku_core/topics,
../../waku/node/peer_manager,
../../waku/waku_discv5,
../../waku/waku_metadata,
Expand All @@ -23,8 +24,6 @@ import


procSuite "Waku Metadata Protocol":

# TODO: Add tests with shards when ready
asyncTest "request() returns the supported metadata of the peer":
let clusterId = 10.uint32
let
Expand All @@ -34,6 +33,9 @@ procSuite "Waku Metadata Protocol":
# Start nodes
await allFutures([node1.start(), node2.start()])

node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/7"))
node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/6"))

# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)
require:
Expand All @@ -48,3 +50,5 @@ procSuite "Waku Metadata Protocol":

check:
response1.get().clusterId.get() == clusterId
response1.get().shards == @[uint32(6), uint32(7)]

4 changes: 3 additions & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ proc new*(T: type WakuNode,
)

# mount metadata protocol
let metadata = WakuMetadata.new(netConfig.clusterId)
let metadata = WakuMetadata.new(netConfig.clusterId, queue)
node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec))
node.wakuMetadata = metadata
peerManager.wakuMetadata = metadata
Expand Down Expand Up @@ -1127,6 +1127,8 @@ proc start*(node: WakuNode) {.async.} =

node.started = true

node.wakuMetadata.start()

info "Node started successfully"

proc stop*(node: WakuNode) {.async.} =
Expand Down
125 changes: 81 additions & 44 deletions waku/waku_metadata/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ else:
{.push raises: [].}

import
std/[options, sequtils, random],
std/[options, sequtils, random, sets],
stew/results,
chronicles,
chronos,
Expand All @@ -27,57 +27,56 @@ const RpcResponseMaxBytes* = 1024
type
WakuMetadata* = ref object of LPProtocol
clusterId*: uint32
shards*: seq[uint32]
shards*: HashSet[uint32]
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]

proc respond(m: WakuMetadata, conn: Connection): Future[Result[void, string]] {.async, gcsafe.} =
try:
await conn.writeLP(WakuMetadataResponse(
clusterId: some(m.clusterId),
shards: m.shards
).encode().buffer)
except CatchableError as exc:
return err(exc.msg)
let response = WakuMetadataResponse(
clusterId: some(m.clusterId),
shards: toSeq(m.shards)
)

let res = catch: await conn.writeLP(response.encode().buffer)
if res.isErr():
return err(res.error.msg)

return ok()

proc request*(m: WakuMetadata, conn: Connection): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} =
var buffer: seq[byte]
var error: string
try:
await conn.writeLP(WakuMetadataRequest(
clusterId: some(m.clusterId),
shards: m.shards,
).encode().buffer)
buffer = await conn.readLp(RpcResponseMaxBytes)
except CatchableError as exc:
error = $exc.msg
finally:
# close, no more data is expected
await conn.closeWithEof()
let request = WakuMetadataRequest(clusterId: some(m.clusterId), shards: toSeq(m.shards))

let writeRes = catch: await conn.writeLP(request.encode().buffer)
let readRes = catch: await conn.readLp(RpcResponseMaxBytes)

# close no watter what
let closeRes = catch: await conn.closeWithEof()
if closeRes.isErr():
return err("close failed: " & closeRes.error.msg)

if error.len > 0:
return err("write/read failed: " & error)
if writeRes.isErr():
return err("write failed: " & writeRes.error.msg)

let decodedBuff = WakuMetadataResponse.decode(buffer)
if decodedBuff.isErr():
return err("decode failed: " & $decodedBuff.error)
let buffer =
if readRes.isErr():
return err("read failed: " & readRes.error.msg)
else: readRes.get()

echo decodedBuff.get().clusterId
return ok(decodedBuff.get())
let response = WakuMetadataResponse.decode(buffer).valueOr:
return err("decode failed: " & $error)

return ok(response)

proc initProtocolHandler*(m: WakuMetadata) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var buffer: seq[byte]
try:
buffer = await conn.readLp(RpcResponseMaxBytes)
except CatchableError as exc:
let res = catch: await conn.readLp(RpcResponseMaxBytes)
let buffer = res.valueOr:
error "Connection reading error", error=error.msg
return

let decBuf = WakuMetadataResponse.decode(buffer)
if decBuf.isErr():
let response = WakuMetadataResponse.decode(buffer).valueOr:
error "Response decoding error", error=error
return

let response = decBuf.get()
debug "Received WakuMetadata request",
remoteClusterId=response.clusterId,
remoteShards=response.shards,
Expand All @@ -92,12 +91,50 @@ proc initProtocolHandler*(m: WakuMetadata) =
m.handler = handle
m.codec = WakuMetadataCodec

proc new*(T: type WakuMetadata, clusterId: uint32): T =
let m = WakuMetadata(
clusterId: clusterId,
# TODO: must be updated real time
shards: @[],
)
m.initProtocolHandler()
proc new*(T: type WakuMetadata,
clusterId: uint32,
queue: AsyncEventQueue[SubscriptionEvent],
): T =
let wm = WakuMetadata(clusterId: clusterId, topicSubscriptionQueue: queue)

wm.initProtocolHandler()

info "Created WakuMetadata protocol", clusterId=clusterId
return m

return wm

proc subscriptionsListener(wm: WakuMetadata) {.async.} =
## Listen for pubsub topics subscriptions changes

let key = wm.topicSubscriptionQueue.register()

while wm.started:
let events = await wm.topicSubscriptionQueue.waitEvents(key)

for event in events:
let parsedTopic = NsPubsubTopic.parse(event.topic).valueOr:
continue

if parsedTopic.kind != NsPubsubTopicKind.StaticSharding:
continue

if parsedTopic.clusterId != wm.clusterId:
continue

case event.kind:
of PubsubSub:
wm.shards.incl(parsedTopic.shardId)
of PubsubUnsub:
wm.shards.excl(parsedTopic.shardId)
else:
continue

wm.topicSubscriptionQueue.unregister(key)

proc start*(wm: WakuMetadata) =
wm.started = true

asyncSpawn wm.subscriptionsListener()

proc stop*(wm: WakuMetadata) =
wm.started = false