Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(networking): add service slots to peer manager #1473

Merged
merged 8 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import
../../waku/v2/protocol/waku_archive/retention_policy,
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity,
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/utils/peers,
../../waku/v2/utils/wakuenr,
Expand Down Expand Up @@ -401,7 +404,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
if conf.storenode != "":
try:
mountStoreClient(node)
setStorePeer(node, conf.storenode)
let storenode = parseRemotePeerInfo(conf.storenode)
node.peerManager.addServicePeer(storenode, WakuStoreCodec)
except:
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())

Expand All @@ -415,7 +419,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
if conf.lightpushnode != "":
try:
mountLightPushClient(node)
setLightPushPeer(node, conf.lightpushnode)
let lightpushnode = parseRemotePeerInfo(conf.lightpushnode)
node.peerManager.addServicePeer(lightpushnode, WakuLightPushCodec)
except:
return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg())

Expand All @@ -429,7 +434,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
if conf.filternode != "":
try:
await mountFilterClient(node)
setFilterPeer(node, conf.filternode)
let filternode = parseRemotePeerInfo(conf.filternode)
node.peerManager.addServicePeer(filternode, WakuFilterCodec)
except:
return err("failed to set node waku filter peer: " & getCurrentExceptionMsg())

Expand All @@ -442,7 +448,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,

if conf.peerExchangeNode != "":
try:
setPeerExchangePeer(node, conf.peerExchangeNode)
let peerExchangeNode = parseRemotePeerInfo(conf.peerExchangeNode)
node.peerManager.addServicePeer(peerExchangeNode, WakuPeerExchangeCodec)
except:
return err("failed to set node waku peer-exchange peer: " & getCurrentExceptionMsg())

Expand Down Expand Up @@ -498,6 +505,10 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
if conf.keepAlive:
node.startKeepalive()

# Maintain relay connections
if conf.relay:
node.peerManager.start()

return ok()

when defined(waku_exp_store_resume):
Expand Down
95 changes: 94 additions & 1 deletion tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/message
libp2p/protocols/pubsub/rpc/message,
libp2p/builders
import
../../waku/common/sqlite,
../../waku/v2/node/peer_manager/peer_manager,
Expand All @@ -24,6 +25,8 @@ import
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/protocol/waku_swap/waku_swap,
../test_helpers,
./testlib/testutils
Expand Down Expand Up @@ -401,3 +404,93 @@ procSuite "Peer Manager":
nodes[3].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound

await allFutures(nodes.mapIt(it.stop()))

asyncTest "Peer store addServicePeer() stores service peers":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"

let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60932))
peer1 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & "1")
peer2 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30301/p2p/" & basePeerId & "2")
peer3 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30302/p2p/" & basePeerId & "3")
peer4 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30303/p2p/" & basePeerId & "4")
peer5 = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30303/p2p/" & basePeerId & "5")

# service peers
node.peerManager.addServicePeer(peer1, WakuStoreCodec)
node.peerManager.addServicePeer(peer2, WakuFilterCodec)
node.peerManager.addServicePeer(peer3, WakuLightPushCodec)
node.peerManager.addServicePeer(peer4, WakuPeerExchangeCodec)

# relay peers (should not be added)
node.peerManager.addServicePeer(peer5, WakuRelayCodec)

# all peers are stored in the peerstore
check:
node.peerManager.peerStore.peers().anyIt(it.peerId == peer1.peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peer2.peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peer3.peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peer4.peerId)

# but the relay peer is not
node.peerManager.peerStore.peers().anyIt(it.peerId == peer5.peerId) == false

# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peer1.peerId
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peer2.peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peer3.peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peer4.peerId

# but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false

test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"

# Create peer manager
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise().build(),
storage = nil)

# Create 3 peer infos
let peers = toSeq(1..3).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))

# Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec]

# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer1.isSome() == true
selectedPeer1.get().peerId == peers[0].peerId

# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId

# And return none if we dont have any peer for that protocol
let selectedPeer3 = pm.selectPeer(WakuLightPushCodec)
check:
selectedPeer3.isSome() == false

# Now we add service peers for different protocols peer[1..3]
pm.addServicePeer(peers[1], WakuStoreCodec)
pm.addServicePeer(peers[2], WakuLightPushCodec)

# We no longer get one from the peerstore. Slots are being used instead.
let selectedPeer4 = pm.selectPeer(WakuStoreCodec)
check:
selectedPeer4.isSome() == true
selectedPeer4.get().peerId == peers[1].peerId

let selectedPeer5 = pm.selectPeer(WakuLightPushCodec)
check:
selectedPeer5.isSome() == true
selectedPeer5.get().peerId == peers[2].peerId
10 changes: 0 additions & 10 deletions tests/v2/test_peer_store_extended.nim
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,6 @@ suite "Extended nim-libp2p Peer Store":
peerStore.hasPeers(protocolMatcher("/vac/waku/store/2.0.0"))
not peerStore.hasPeers(protocolMatcher("/vac/waku/does-not-exist/2.0.0"))

test "selectPeer() returns if a peer supports a given protocol":
# When
let swapPeer = peerStore.selectPeer("/vac/waku/swap/2.0.0")

# Then
check:
swapPeer.isSome()
swapPeer.get().peerId == p5
swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]

test "getPeersByDirection()":
# When
let inPeers = peerStore.getPeersByDirection(Inbound)
Expand Down
6 changes: 3 additions & 3 deletions waku/v2/node/jsonrpc/store_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import
../../utils/time,
../waku_node,
../peer_manager/peer_manager,
./jsonrpc_types,
./jsonrpc_types,
./jsonrpc_utils

export jsonrpc_types
Expand All @@ -30,7 +30,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"

let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers")

Expand All @@ -52,7 +52,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =

if not await queryFut.withTimeout(futTimeout):
raise newException(ValueError, "No history response received (timeout)")

let res = queryFut.read()
if res.isErr():
raise newException(ValueError, $res.error)
Expand Down
65 changes: 60 additions & 5 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ const
InitialBackoffInSec = 120
BackoffFactor = 4

limit the amount of paralel dials
Limit the amount of paralel dials
MaxParalelDials = 10

delay between consecutive relayConnectivityLoop runs
Delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(30)

type
Expand All @@ -54,6 +54,8 @@ type
backoffFactor*: int
maxFailedAttempts*: int
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
started: bool

####################
# Helper functions #
Expand Down Expand Up @@ -105,7 +107,10 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
pm.peerStore[ConnectionBook][peerId] = CannotConnect

debug "Dialing peer failed", peerId = peerId, reason = reasonFailed, failedAttempts=failedAttempts
debug "Dialing peer failed",
peerId = peerId,
reason = reasonFailed,
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
waku_peers_dials.inc(labelValues = [reasonFailed])

# Update storage
Expand Down Expand Up @@ -192,13 +197,14 @@ proc new*(T: type PeerManager,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
maxFailedAttempts: maxFailedAttempts)

proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)

pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)

pm.serviceSlots = initTable[string, RemotePeerInfo]()

if not storage.isNil():
debug "found persistent peer storage"
pm.loadFromStorage() # Load previously managed peers.
Expand Down Expand Up @@ -239,6 +245,20 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)

proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Do not add relay peers
if proto == WakuRelayCodec:
warn "Can't add relay peer to service peers slots"
return

info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto

# Set peer for service slot
pm.serviceSlots[proto] = remotePeerInfo

# TODO: Remove proto once fully refactored
pm.addPeer(remotePeerInfo, proto)

proc reconnectPeers*(pm: PeerManager,
proto: string,
protocolMatcher: Matcher,
Expand Down Expand Up @@ -335,7 +355,8 @@ proc connectToNodes*(pm: PeerManager,

# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
while true:
debug "Starting relay connectivity loop"
while pm.started:

let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
Expand Down Expand Up @@ -364,3 +385,37 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)

await sleepAsync(ConnectivityLoopInterval)

proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proc name is too generic. Rename this method to something like selectPeerByProto

debug "Selecting peer from peerstore", protocol=proto

# Selects the best peer for a given protocol
let peers = pm.peerStore.getPeersByProtocol(proto)

# No criteria for selecting a peer for WakuRelay, random one
if proto == WakuRelayCodec:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

# For other protocols, we select the peer that is slotted for the given protocol
pm.serviceSlots.withValue(proto, serviceSlot):
debug "Got peer from service slots", peerId=serviceSlot[].peerId, multi=serviceSlot[].addrs[0], protocol=proto
return some(serviceSlot[])

# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()

proc stop*(pm: PeerManager) =
pm.started = false
19 changes: 7 additions & 12 deletions waku/v2/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)

proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
# Returns `true` if the peer is connected
peerStore.connectedness(peerId) == Connected

proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
# TODO: What if peer does not exist in the peerStore?
Expand All @@ -165,20 +169,11 @@ proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool =
# Returns `true` if the peerstore has any peer matching the protocolMatcher
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))

proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
let peers = peerStore.peers().filterIt(it.protos.contains(proto))

if peers.len >= 1:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]

return some(peerStored.toRemotePeerInfo())
else:
return none(RemotePeerInfo)

proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[StoredInfo] =
return peerStore.peers.filterIt(it.direction == direction)

proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness != Connected)

proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
return peerStore.peers.filterIt(it.protos.contains(proto))
Loading