Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix keepalive for connected peers #588

Merged
merged 2 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions examples/v2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
await node.start()

if conf.filternode != "":
node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay, keepAlive = conf.keepAlive)
node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay)
else:
node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay, keepAlive = conf.keepAlive)
node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay)

node.mountKeepalive()

let nick = await readNick(transp)
echo "Welcome, " & nick & "!"
Expand Down Expand Up @@ -377,6 +379,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =

await chat.readWriteLoop()

if conf.keepAlive:
node.startKeepalive()

runForever()
#await allFuturesThrowing(libp2pFuts)

Expand Down
3 changes: 2 additions & 1 deletion tests/all_tests_v2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
./v2/test_waku_rln_relay,
./v2/test_waku_bridge,
./v2/test_peer_storage
./v2/test_peer_storage,
./v2/test_waku_keepalive

# TODO Only enable this once swap module is integrated more nicely as a dependency, i.e. as submodule with CI etc
# For PoC execute it manually and run separate module here: https://github.com/vacp2p/swap-contracts-module
Expand Down
52 changes: 52 additions & 0 deletions tests/v2/test_waku_keepalive.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{.used.}

import
std/[options, tables, sets],
testutils/unittests, chronos, chronicles,
stew/shims/net as stewNet,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/multistream,
../../waku/v2/node/wakunode2,
../../waku/v2/protocol/waku_keepalive/waku_keepalive,
../test_helpers, ./utils

procSuite "Waku Keepalive":

asyncTest "handle keepalive":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))

await node1.start()
node1.mountRelay()
node1.mountKeepalive()

await node2.start()
node2.mountRelay()
node2.mountKeepalive()

await node1.connectToNodes(@[node2.peerInfo])

var completionFut = newFuture[bool]()

proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
debug "WakuKeepalive message received"

check:
proto == waku_keepalive.WakuKeepaliveCodec

completionFut.complete(true)

node2.wakuKeepalive.handler = handle

node1.startKeepalive()

check:
(await completionFut.withTimeout(5.seconds)) == true

await allFutures([node1.stop(), node2.stop()])
2 changes: 1 addition & 1 deletion waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ let
# Helper functions #
####################

proc toPeerInfo(storedInfo: StoredInfo): PeerInfo =
proc toPeerInfo*(storedInfo: StoredInfo): PeerInfo =
PeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
Expand Down
38 changes: 33 additions & 5 deletions waku/v2/node/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import
../protocol/waku_filter/waku_filter,
../protocol/waku_rln_relay/[rln,waku_rln_relay_utils],
../protocol/waku_lightpush/waku_lightpush,
../protocol/waku_keepalive/waku_keepalive,
../utils/peers,
./storage/message/message_store,
./storage/peer/peer_storage,
Expand Down Expand Up @@ -62,6 +63,7 @@ type
wakuSwap*: WakuSwap
wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush
wakuKeepalive*: WakuKeepalive
peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
Expand Down Expand Up @@ -456,7 +458,6 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) =
proc mountRelay*(node: WakuNode,
topics: seq[string] = newSeq[string](),
rlnRelayEnabled = false,
keepAlive = false,
relayMessages = true,
triggerSelf = true) {.gcsafe.} =
let wakuRelay = WakuRelay.init(
Expand All @@ -468,7 +469,7 @@ proc mountRelay*(node: WakuNode,
verifySignature = false
)

info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, keepAlive=keepAlive, relayMessages=relayMessages
info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, relayMessages=relayMessages

node.switch.mount(wakuRelay)

Expand All @@ -482,7 +483,6 @@ proc mountRelay*(node: WakuNode,
return

node.wakuRelay = wakuRelay
wakuRelay.keepAlive = keepAlive

node.subscribe(defaultTopic, none(TopicHandler))

Expand Down Expand Up @@ -522,6 +522,29 @@ proc mountLightPush*(node: WakuNode) =

node.switch.mount(node.wakuLightPush)

proc mountKeepalive*(node: WakuNode) =
info "mounting keepalive"

node.wakuKeepalive = WakuKeepalive.new(node.peerManager, node.rng)

node.switch.mount(node.wakuKeepalive)

proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
while node.started:
# Keep all managed peers alive when idle
trace "Running keepalive"

await node.wakuKeepalive.keepAllAlive()

await sleepAsync(keepalive)

proc startKeepalive*(node: WakuNode) =
let defaultKeepalive = 5.minutes # 50% of the default chronosstream timeout duration

info "starting keepalive", keepalive=defaultKeepalive

asyncSpawn node.keepaliveLoop(defaultKeepalive)

## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address
Expand Down Expand Up @@ -704,14 +727,15 @@ when isMainModule:
setStorePeer(node, conf.storenode)



# Relay setup
mountRelay(node,
conf.topics.split(" "),
rlnRelayEnabled = conf.rlnRelay,
keepAlive = conf.keepAlive,
relayMessages = conf.relay) # Indicates if node is capable to relay messages

# Keepalive mounted on all nodes
mountKeepalive(node)

# Resume historical messages, this has to be called after the relay setup
if conf.store and conf.persistMessages:
waitFor node.resume()
Expand Down Expand Up @@ -762,5 +786,9 @@ when isMainModule:
quit(QuitSuccess)

c_signal(SIGTERM, handleSigterm)

# Start keepalive, if enabled
if conf.keepAlive:
node.startKeepalive()

runForever()
85 changes: 85 additions & 0 deletions waku/v2/protocol/waku_keepalive/waku_keepalive.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import
std/[tables, sequtils, options],
bearssl,
chronos, chronicles, metrics, stew/results,
libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/crypto/crypto,
../../utils/requests,
../../node/peer_manager/peer_manager,
../message_notifier,
../waku_relay,
waku_keepalive_types

export waku_keepalive_types

declarePublicGauge waku_keepalive_count, "number of keepalives received"
declarePublicGauge waku_keepalive_errors, "number of keepalive protocol errors", ["type"]

logScope:
topics = "wakukeepalive"

const
WakuKeepaliveCodec* = "/vac/waku/keepalive/2.0.0-alpha1"

# Error types (metric label values)
const
dialFailure = "dial_failure"

# Encoding and decoding -------------------------------------------------------
proc encode*(msg: KeepaliveMessage): ProtoBuffer =
var pb = initProtoBuffer()

# @TODO: Currently no fields defined for a KeepaliveMessage

return pb

proc init*(T: type KeepaliveMessage, buffer: seq[byte]): ProtoResult[T] =
var msg = KeepaliveMessage()
let pb = initProtoBuffer(buffer)

# @TODO: Currently no fields defined for a KeepaliveMessage

ok(msg)

# Protocol -------------------------------------------------------
proc new*(T: type WakuKeepalive, peerManager: PeerManager, rng: ref BrHmacDrbgContext): T =
debug "new WakuKeepalive"
var wk: WakuKeepalive
new wk

wk.rng = crypto.newRng()
wk.peerManager = peerManager

wk.init()

return wk

method init*(wk: WakuKeepalive) =
debug "init WakuKeepalive"

proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
info "WakuKeepalive message received"
waku_keepalive_count.inc()

wk.handler = handle
wk.codec = WakuKeepaliveCodec

proc keepAllAlive*(wk: WakuKeepalive) {.async, gcsafe.} =
# Send keepalive message to all managed and connected peers
let peers = wk.peerManager.peers().filterIt(wk.peerManager.connectedness(it.peerId) == Connected).mapIt(it.toPeerInfo())

for peer in peers:
let connOpt = await wk.peerManager.dialPeer(peer, WakuKeepaliveCodec)

if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_keepalive_errors.inc(labelValues = [dialFailure])
return

await connOpt.get().writeLP(KeepaliveMessage().encode().buffer) # Send keep-alive on connection
12 changes: 12 additions & 0 deletions waku/v2/protocol/waku_keepalive/waku_keepalive_types.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import
bearssl,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager

type
KeepaliveMessage* = object
# Currently no fields for a keepalive message

WakuKeepalive* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager
27 changes: 0 additions & 27 deletions waku/v2/protocol/waku_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,9 @@ logScope:

const
WakuRelayCodec* = "/vac/waku/relay/2.0.0-beta2"
DefaultKeepAlive = 5.minutes # 50% of the default chronosstream timeout duration

type
WakuRelay* = ref object of GossipSub
keepAlive*: bool

proc keepAlive*(w: WakuRelay) {.async.} =
while w.keepAlive:
# Keep all mesh peers alive when idle
trace "Running keepalive"

for topic in w.topics.keys:
trace "Keepalive on topic", topic=topic
let
# Mesh peers for topic
mpeers = toSeq(w.mesh.getOrDefault(topic))
# Peers we're backing off from on topic
backoffPeers = w.backingOff.getOrDefault(topic)
# Only keep peers alive that we're not backing off from
keepAlivePeers = mpeers.filterIt(not backoffPeers.hasKey(it.peerId))

w.broadcast(keepAlivePeers, RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))))

await sleepAsync(DefaultKeepAlive)

method init*(w: WakuRelay) =
debug "init"
Expand Down Expand Up @@ -104,14 +83,8 @@ method unsubscribeAll*(w: WakuRelay,
method start*(w: WakuRelay) {.async.} =
debug "start"
await procCall GossipSub(w).start()

if w.keepAlive:
# Keep connection to mesh peers alive over periods of idleness
asyncSpawn keepAlive(w)

method stop*(w: WakuRelay) {.async.} =
debug "stop"

w.keepAlive = false

await procCall GossipSub(w).stop()