-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathprotocol.nim
140 lines (105 loc) · 3.64 KB
/
protocol.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, sequtils, random, sets],
stew/results,
chronicles,
chronos,
metrics,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../waku_core,
./rpc
logScope:
topics = "waku metadata"
const WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
const RpcResponseMaxBytes* = 1024
type
WakuMetadata* = ref object of LPProtocol
clusterId*: uint32
shards*: HashSet[uint32]
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]
proc respond(m: WakuMetadata, conn: Connection): Future[Result[void, string]] {.async, gcsafe.} =
let response = WakuMetadataResponse(
clusterId: some(m.clusterId),
shards: toSeq(m.shards)
)
let res = catch: await conn.writeLP(response.encode().buffer)
if res.isErr():
return err(res.error.msg)
return ok()
proc request*(m: WakuMetadata, conn: Connection): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} =
let request = WakuMetadataRequest(clusterId: some(m.clusterId), shards: toSeq(m.shards))
let writeRes = catch: await conn.writeLP(request.encode().buffer)
let readRes = catch: await conn.readLp(RpcResponseMaxBytes)
# close no watter what
let closeRes = catch: await conn.closeWithEof()
if closeRes.isErr():
return err("close failed: " & closeRes.error.msg)
if writeRes.isErr():
return err("write failed: " & writeRes.error.msg)
let buffer =
if readRes.isErr():
return err("read failed: " & readRes.error.msg)
else: readRes.get()
let response = WakuMetadataResponse.decode(buffer).valueOr:
return err("decode failed: " & $error)
return ok(response)
proc initProtocolHandler*(m: WakuMetadata) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let res = catch: await conn.readLp(RpcResponseMaxBytes)
let buffer = res.valueOr:
error "Connection reading error", error=error.msg
return
let response = WakuMetadataResponse.decode(buffer).valueOr:
error "Response decoding error", error=error
return
debug "Received WakuMetadata request",
remoteClusterId=response.clusterId,
remoteShards=response.shards,
localClusterId=m.clusterId,
localShards=m.shards
discard await m.respond(conn)
# close, no data is expected
await conn.closeWithEof()
m.handler = handle
m.codec = WakuMetadataCodec
proc new*(T: type WakuMetadata,
clusterId: uint32,
queue: AsyncEventQueue[SubscriptionEvent],
): T =
let wm = WakuMetadata(clusterId: clusterId, topicSubscriptionQueue: queue)
wm.initProtocolHandler()
info "Created WakuMetadata protocol", clusterId=clusterId
return wm
proc subscriptionsListener(wm: WakuMetadata) {.async.} =
## Listen for pubsub topics subscriptions changes
let key = wm.topicSubscriptionQueue.register()
while wm.started:
let events = await wm.topicSubscriptionQueue.waitEvents(key)
for event in events:
let parsedTopic = NsPubsubTopic.parse(event.topic).valueOr:
continue
if parsedTopic.kind != NsPubsubTopicKind.StaticSharding:
continue
if parsedTopic.clusterId != wm.clusterId:
continue
case event.kind:
of PubsubSub:
wm.shards.incl(parsedTopic.shardId)
of PubsubUnsub:
wm.shards.excl(parsedTopic.shardId)
else:
continue
wm.topicSubscriptionQueue.unregister(key)
proc start*(wm: WakuMetadata) =
wm.started = true
asyncSpawn wm.subscriptionsListener()
proc stop*(wm: WakuMetadata) =
wm.started = false