Skip to content

Commit

Permalink
feat: adding validator hooks to pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov committed Dec 14, 2019
1 parent 4d8ede8 commit 73e20f0
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 124 deletions.
3 changes: 2 additions & 1 deletion libp2p/connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,5 @@ method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcs
result = c.observedAddrs

proc `$`*(conn: Connection): string =
result = $(conn.peerInfo)
if not isNil(conn.peerInfo):
result = $(conn.peerInfo)
2 changes: 1 addition & 1 deletion libp2p/protocols/identify.nim
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ proc identify*(p: Identify,
if peer != remotePeerInfo.peerId:
trace "Peer ids don't match",
remote = peer.pretty(),
local = remotePeerInfo.get().id
local = remotePeerInfo.id

raise newException(IdentityNoMatchError,
"Peer ids don't match")
Expand Down
30 changes: 16 additions & 14 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,31 @@ method subscribeTopic*(f: FloodSub,
# unsubscribe the peer from the topic
f.floodsub[topic].excl(peerId)

method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async, gcsafe.} =
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} =
## handle peer disconnects
for t in f.floodsub.keys:
f.floodsub[t].excl(peer.id)

method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} =
trace "processing RPC message", peer = peer.id, msg = rpcMsgs
for m in rpcMsgs: # for all RPC messages
trace "processing message", msg = rpcMsgs
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
f.subscribeTopic(s.topic, s.subscribe, peer.id)
rpcMsgs: seq[RPCMsg]) {.async.} =
await procCall PubSub(f).rpcHandler(peer, rpcMsgs)

for m in rpcMsgs: # for all RPC messages
if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[string] = initHashSet[string]()
for msg in m.messages: # for every message
if msg.topicIDs in f.validators:
trace "process hooks"

if msg.msgId notin f.seen:
f.seen.put(msg.msgId) # add the message to the seen cache

if not msg.verify(peer.peerInfo):
trace "dropping message due to failed signature verification"
continue

if not (await f.validate(msg)):
trace "dropping message due to failed validation"
continue

for t in msg.topicIDs: # for every topic in the message
if t in f.floodsub:
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
Expand All @@ -82,7 +84,7 @@ method rpcHandler*(f: FloodSub,
await f.peers[p].send(@[RPCMsg(messages: m.messages)])

method init(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async, gcsafe.} =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc...
Expand All @@ -95,7 +97,7 @@ method init(f: FloodSub) =

method publish*(f: FloodSub,
topic: string,
data: seq[byte]) {.async, gcsafe.} =
data: seq[byte]) {.async.} =
await procCall PubSub(f).publish(topic, data)

if data.len <= 0 or topic.len <= 0:
Expand All @@ -113,7 +115,7 @@ method publish*(f: FloodSub,
await f.peers[p].send(@[RPCMsg(messages: @[msg])])

method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async, gcsafe.} =
topics: seq[TopicPair]) {.async.} =
await procCall PubSub(f).unsubscribe(topics)

for p in f.peers.values:
Expand Down
74 changes: 38 additions & 36 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ proc addInterval(every: Duration, cb: CallbackFunc,
return retFuture

method init(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async, gcsafe.} =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc...
Expand All @@ -84,7 +84,7 @@ method init(g: GossipSub) =
g.handler = handler
g.codec = GossipSubCodec

method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async, gcsafe.} =
method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async.} =
## handle peer disconnects
await procCall FloodSub(g).handleDisconnect(peer)
for t in g.gossipsub.keys:
Expand Down Expand Up @@ -161,16 +161,10 @@ proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[

method rpcHandler(g: GossipSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} =
rpcMsgs: seq[RPCMsg]) {.async.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsgs)

trace "processing RPC message", peer = peer.id, msg = rpcMsgs
for m in rpcMsgs: # for all RPC messages
trace "processing messages", msg = rpcMsgs
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
g.subscribeTopic(s.topic, s.subscribe, peer.id)

for m in rpcMsgs: # for all RPC messages
if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[string] = initHashSet[string]()
for msg in m.messages: # for every message
Expand All @@ -181,6 +175,14 @@ method rpcHandler(g: GossipSub,

g.seen.put(msg.msgId) # add the message to the seen cache

if not msg.verify(peer.peerInfo):
trace "dropping message due to failed signature verification"
continue

if not (await g.validate(msg)):
trace "dropping message due to failed validation"
continue

# this shouldn't happen
if g.peerInfo.peerId == msg.fromPeerId():
trace "skipping messages from self", msg = msg.msgId
Expand Down Expand Up @@ -227,10 +229,9 @@ method rpcHandler(g: GossipSub,

if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send(@[RPCMsg(control: some(respControl),
messages: messages)])
await peer.send(@[RPCMsg(control: some(respControl), messages: messages)])

proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} =
proc replenishFanout(g: GossipSub, topic: string) {.async.} =
## get fanout peers for a topic
trace "about to replenish fanout"
if topic notin g.fanout:
Expand All @@ -246,7 +247,7 @@ proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} =

trace "fanout replenished with peers", peers = g.fanout[topic].len

proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} =
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "about to rebalance mesh"
# create a mesh topic that we're subscribing to
if topic notin g.mesh:
Expand Down Expand Up @@ -288,7 +289,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} =

trace "mesh balanced, got peers", peers = g.mesh[topic].len

proc dropFanoutPeers(g: GossipSub) {.async, gcsafe.} =
proc dropFanoutPeers(g: GossipSub) {.async.} =
# drop peers that we haven't published to in
# GossipSubFanoutTTL seconds
for topic in g.lastFanoutPubSub.keys:
Expand Down Expand Up @@ -334,7 +335,7 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
result[id] = ControlMessage()
result[id].ihave.add(ihave)

proc heartbeat(g: GossipSub) {.async, gcsafe.} =
proc heartbeat(g: GossipSub) {.async.} =
trace "running heartbeat"

await g.heartbeatLock.acquire()
Expand All @@ -353,12 +354,12 @@ proc heartbeat(g: GossipSub) {.async, gcsafe.} =

method subscribe*(g: GossipSub,
topic: string,
handler: TopicHandler) {.async, gcsafe.} =
handler: TopicHandler) {.async.} =
await procCall PubSub(g).subscribe(topic, handler)
asyncCheck g.rebalanceMesh(topic)

method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async, gcsafe.} =
topics: seq[TopicPair]) {.async.} =
await procCall PubSub(g).unsubscribe(topics)

for pair in topics:
Expand All @@ -372,10 +373,11 @@ method unsubscribe*(g: GossipSub,

method publish*(g: GossipSub,
topic: string,
data: seq[byte]) {.async, gcsafe.} =
data: seq[byte]) {.async.} =
await procCall PubSub(g).publish(topic, data)

trace "about to publish message on topic", name = topic, data = data.toHex()
trace "about to publish message on topic", name = topic,
data = data.toHex()
if data.len > 0 and topic.len > 0:
var peers: HashSet[string]
if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh
Expand Down Expand Up @@ -453,7 +455,7 @@ when isMainModule and not defined(release):

let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

for i in 0..<15:
Expand All @@ -480,7 +482,7 @@ when isMainModule and not defined(release):

let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

for i in 0..<15:
Expand All @@ -505,12 +507,12 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA)))

proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard

let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

for i in 0..<15:
Expand All @@ -535,13 +537,13 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA)))

proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard

let topic = "foobar"
gossipSub.fanout[topic] = initHashSet[string]()
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis)
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

for i in 0..<6:
Expand All @@ -568,7 +570,7 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA)))

proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard

let topic1 = "foobar1"
Expand All @@ -578,7 +580,7 @@ when isMainModule and not defined(release):
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis)
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(500.millis)

proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

for i in 0..<6:
Expand Down Expand Up @@ -608,10 +610,10 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA)))

proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard

proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down Expand Up @@ -657,10 +659,10 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA)))

proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard

proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down Expand Up @@ -689,10 +691,10 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA)))

proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard

proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down Expand Up @@ -721,10 +723,10 @@ when isMainModule and not defined(release):
let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA)))

proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard

proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
proc writeHandler(data: seq[byte]) {.async.} =
discard

let topic = "foobar"
Expand Down
Loading

0 comments on commit 73e20f0

Please sign in to comment.