diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 47cb2aea5e..e80d004e66 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -9,10 +9,9 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[strformat, strutils, times, json, options, random] +import std/[strformat, strutils, times, options, random] import confutils, chronicles, chronos, stew/shims/net as stewNet, eth/keys, bearssl, stew/[byteutils, results], - nimcrypto/pbkdf2, metrics, metrics/chronos_httpserver import libp2p/[switch, # manage transports, a single entry point for dialing and listening @@ -22,11 +21,10 @@ import libp2p/[switch, # manage transports, a single entry poi peerinfo, # manage the information of a peer, such as peer ID and public / private key peerid, # Implement how peers interact protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs - protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS nameresolving/dnsresolver]# define DNS resolution import ../../waku/waku_core, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/waku_lightpush/rpc, ../../waku/waku_filter, ../../waku/waku_enr, diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 6308ea71c0..a8516a7b0a 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -53,7 +53,7 @@ import ../../waku/waku_peer_exchange, ../../waku/waku_rln_relay, ../../waku/waku_store, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/waku_filter, ../../waku/waku_filter_v2, ./wakunode2_validator_signed, diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 8576f2725a..a480adc500 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils, times], + std/[options, sequtils, times, sugar], stew/shims/net as stewNet, testutils/unittests, chronos, @@ -21,10 +21,12 @@ import ../../waku/node/peer_manager/peer_manager, ../../waku/node/peer_manager/peer_store/waku_peer_storage, ../../waku/waku_node, - ../../waku/waku_relay, - ../../waku/waku_store, - ../../waku/waku_filter, - ../../waku/waku_lightpush, + ../../waku/waku_core, + ../../waku/waku_enr/capabilities, + ../../waku/waku_relay/protocol, + ../../waku/waku_store/common, + ../../waku/waku_filter/protocol, + ../../waku/waku_lightpush/common, ../../waku/waku_peer_exchange, ../../waku/waku_metadata, ./testlib/common, @@ -128,7 +130,6 @@ procSuite "Peer Manager": await node.stop() - asyncTest "Peer manager keeps track of connections": # Create 2 nodes let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) @@ -225,18 +226,34 @@ procSuite "Peer Manager": let database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] - node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) - node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) - peerInfo2 = node2.switch.peerInfo + node1 = newTestWakuNode( + generateSecp256k1Key(), + ValidIpAddress.init("127.0.0.1"), + Port(44048), + peerStorage = storage + ) + node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023)) + + node1.mountMetadata(0).expect("Mounted Waku Metadata") + node2.mountMetadata(0).expect("Mounted Waku Metadata") await node1.start() await node2.start() await node1.mountRelay() await node2.mountRelay() + + let peerInfo2 = node2.switch.peerInfo + var remotePeerInfo2 = peerInfo2.toRemotePeerInfo() + remotePeerInfo2.enr = some(node2.enr) - require: - (await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true + let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2) + assert is12Connected == true, "Node 1 and 2 not connected" + + check: + node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs + + # wait for the peer store update await sleepAsync(chronos.milliseconds(500)) check: @@ -246,10 +263,17 @@ procSuite "Peer Manager": node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage - let - node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) + let node3 = newTestWakuNode( + generateSecp256k1Key(), + ValidIpAddress.init("127.0.0.1"), + Port(56037), + peerStorage = storage + ) + + node3.mountMetadata(0).expect("Mounted Waku Metadata") await node3.start() + check: # Node2 has been loaded after "restart", but we have not yet reconnected node3.peerManager.peerStore.peers().len == 1 @@ -257,7 +281,10 @@ procSuite "Peer Manager": node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.mountRelay() - await node3.peerManager.connectToRelayPeers() + + await node3.peerManager.manageRelayPeers() + + await sleepAsync(chronos.milliseconds(500)) check: # Reconnected to node2 after "restart" @@ -297,9 +324,9 @@ procSuite "Peer Manager": topics = @["/waku/2/rs/4/0"], ) - discard node1.mountMetadata(clusterId3) - discard node2.mountMetadata(clusterId4) - discard node3.mountMetadata(clusterId4) + node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata") + node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata") + node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata") # Start nodes await allFutures([node1.start(), node2.start(), node3.start()]) @@ -318,7 +345,6 @@ procSuite "Peer Manager": conn2.isNone conn3.isSome - # TODO: nwaku/issues/1377 xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": let @@ -377,14 +403,28 @@ procSuite "Peer Manager": asyncTest "Peer manager connects to all peers supporting a given protocol": # Create 4 nodes - let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + let nodes = + toSeq(0..<4) + .mapIt( + newTestWakuNode( + nodeKey = generateSecp256k1Key(), + bindIp = ValidIpAddress.init("0.0.0.0"), + bindPort = Port(0), + wakuFlags = some(CapabilitiesBitfield.init(@[Relay])) + ) + ) # Start them - await allFutures(nodes.mapIt(it.start())) + discard nodes.mapIt(it.mountMetadata(0)) await allFutures(nodes.mapIt(it.mountRelay())) + await allFutures(nodes.mapIt(it.start())) # Get all peer infos - let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) + let peerInfos = collect: + for i in 0..nodes.high: + let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo() + peerInfo.enr = some(nodes[i].enr) + peerInfo # Add all peers (but self) to node 0 nodes[0].peerManager.addPeer(peerInfos[1]) @@ -392,7 +432,7 @@ procSuite "Peer Manager": nodes[0].peerManager.addPeer(peerInfos[3]) # Connect to relay peers - await nodes[0].peerManager.connectToRelayPeers() + await nodes[0].peerManager.manageRelayPeers() check: # Peerstore track all three peers diff --git a/tests/test_waku_lightpush.nim b/tests/test_waku_lightpush.nim index de125ac974..76312272ff 100644 --- a/tests/test_waku_lightpush.nim +++ b/tests/test_waku_lightpush.nim @@ -11,6 +11,7 @@ import ../../waku/node/peer_manager, ../../waku/waku_core, ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/waku_lightpush/client, ../../waku/waku_lightpush/protocol_metrics, ../../waku/waku_lightpush/rpc, diff --git a/tests/test_wakunode_lightpush.nim b/tests/test_wakunode_lightpush.nim index 7208c587ba..4daa122838 100644 --- a/tests/test_wakunode_lightpush.nim +++ b/tests/test_wakunode_lightpush.nim @@ -4,16 +4,12 @@ import std/options, stew/shims/net as stewNet, testutils/unittests, - chronicles, - chronos, - libp2p/crypto/crypto, - libp2p/switch + chronos import ../../waku/waku_core, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/node/peer_manager, ../../waku/waku_node, - ./testlib/common, ./testlib/wakucore, ./testlib/wakunode diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index f614b272b5..9ba791204f 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -32,7 +32,8 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = dnsAddrsNameServers: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")], nat: "any", maxConnections: 50, - topics: @[], + clusterId: 1.uint32, + topics: @["/waku/2/rs/1/0"], relay: true ) @@ -55,8 +56,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, dns4DomainName = none(string), discv5UdpPort = none(Port), agentString = none(string), - clusterId: uint32 = 2.uint32, - topics: seq[string] = @["/waku/2/rs/2/0"], + clusterId: uint32 = 1.uint32, + topics: seq[string] = @["/waku/2/rs/1/0"], peerStoreCapacity = none(int)): WakuNode = var resolvedExtIp = extIp @@ -66,7 +67,10 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000)) else: extPort - let conf = defaultTestWakuNodeConf() + var conf = defaultTestWakuNodeConf() + + conf.clusterId = clusterId + conf.topics = topics if dns4DomainName.isSome() and extIp.isNone(): # If there's an error resolving the IP, an exception is thrown and test fails diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index 5581da5c71..72fb4c7b86 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -10,11 +10,10 @@ import import ../../waku/waku_api/message_cache, - ../../waku/common/base64, ../../waku/waku_core, ../../waku/waku_node, ../../waku/node/peer_manager, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../waku/waku_api/rest/server, ../../waku/waku_api/rest/client, ../../waku/waku_api/rest/responses, diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 69db53eed5..dd77aefb60 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -5,7 +5,7 @@ else: import - std/[options, sets, sequtils, times, strutils, math], + std/[options, sugar, sets, sequtils, times, strutils, math], chronos, chronicles, metrics, @@ -17,6 +17,10 @@ import ../../waku_core, ../../waku_relay, ../../waku_enr/sharding, + ../../waku_enr/capabilities, + ../../waku_store/common, + ../../waku_filter_v2/common, + ../../waku_lightpush/common, ../../waku_metadata, ./peer_store/peer_storage, ./waku_peer_store @@ -49,10 +53,10 @@ const BackoffFactor = 4 # Limit the amount of paralel dials - MaxParalelDials = 10 + MaxParallelDials = 10 # Delay between consecutive relayConnectivityLoop runs - ConnectivityLoopInterval = chronos.seconds(15) + ConnectivityLoopInterval = chronos.minutes(1) # How often the peer store is pruned PrunePeerStoreInterval = chronos.minutes(10) @@ -115,22 +119,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO # Do not attempt to manage our unmanageable self return - # ...public key - var publicKey: PublicKey - discard remotePeerInfo.peerId.extractPublicKey(publicKey) - if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and - pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey and + pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0: # Peer already managed and ENR info is already saved return trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs - + pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs - pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey + pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin - + + if remotePeerInfo.protocols.len > 0: + pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols + if remotePeerInfo.enr.isSome(): pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get() @@ -158,27 +161,31 @@ proc connectRelay*(pm: PeerManager, pm.addPeer(peer) let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] - debug "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts + debug "Connecting to relay peer", + wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts var deadline = sleepAsync(dialTimeout) - var workfut = pm.switch.connect(peerId, peer.addrs) - var reasonFailed = "" - - try: - await workfut or deadline - if workfut.finished(): + let workfut = pm.switch.connect(peerId, peer.addrs) + + # Can't use catch: with .withTimeout() in this case + let res = catch: await workfut or deadline + + let reasonFailed = + if not workfut.finished(): + await workfut.cancelAndWait() + "timed out" + elif res.isErr(): res.error.msg + else: if not deadline.finished(): - deadline.cancel() + await deadline.cancelAndWait() + waku_peers_dials.inc(labelValues = ["successful"]) waku_node_conns_initiated.inc(labelValues = [source]) + pm.peerStore[NumberFailedConnBook][peerId] = 0 - return true - else: - reasonFailed = "timed out" - await cancelAndWait(workfut) - except CatchableError as exc: - reasonFailed = "remote peer failed" + return true + # Dial failed pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1 pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) @@ -214,15 +221,15 @@ proc dialPeer(pm: PeerManager, # Dial Peer let dialFut = pm.switch.dial(peerId, addrs, proto) - var reasonFailed = "" - try: - if (await dialFut.withTimeout(dialTimeout)): + + let res = catch: + if await dialFut.withTimeout(dialTimeout): return some(dialFut.read()) - else: - reasonFailed = "timeout" - await cancelAndWait(dialFut) - except CatchableError as exc: - reasonFailed = "failed" + else: await cancelAndWait(dialFut) + + let reasonFailed = + if res.isOk: "timed out" + else: res.error.msg debug "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto @@ -293,105 +300,108 @@ proc canBeConnected*(pm: PeerManager, let now = Moment.init(getTime().toUnix, Second) let lastFailed = pm.peerStore[LastFailedConnBook][peerId] let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts) - if now >= (lastFailed + backoff): - return true - return false + + return now >= (lastFailed + backoff) ################## # Initialisation # ################## proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] = - if pm.switch.connManager.getConnections().hasKey(peerId): - let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) - if conns.len != 0: - let observedAddr = conns[0].connection.observedAddr - let ip = observedAddr.get.getHostname() - if observedAddr.isSome: - # TODO: think if circuit relay ips should be handled differently - let ip = observedAddr.get.getHostname() - return some(ip) - return none(string) + if not pm.switch.connManager.getConnections().hasKey(peerId): + return none(string) + + let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) + if conns.len == 0: + return none(string) + + let obAddr = conns[0].connection.observedAddr.valueOr: + return none(string) + + # TODO: think if circuit relay ips should be handled differently + + return some(obAddr.getHostname()) # called when a connection i) is created or ii) is closed proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = case event.kind - of ConnEventKind.Connected: - let direction = if event.incoming: Inbound else: Outbound - discard - of ConnEventKind.Disconnected: - discard + of ConnEventKind.Connected: + #let direction = if event.incoming: Inbound else: Outbound + discard + of ConnEventKind.Disconnected: + discard + +proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = + # To prevent metadata protocol from breaking prev nodes, by now we only + # disconnect if the clusterid is specified. + if pm.wakuMetadata.clusterId == 0: + return + + let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec) + + var reason: string + block guardClauses: + let conn = res.valueOr: + reason = "dial failed: " & error.msg + break guardClauses + + let metadata = (await pm.wakuMetadata.request(conn)).valueOr: + reason = "waku metatdata request failed: " & error + break guardClauses + + let clusterId = metadata.clusterId.valueOr: + reason = "empty cluster-id reported" + break guardClauses + + if pm.wakuMetadata.clusterId != clusterId: + reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId + break guardClauses + + if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)): + reason = "no shards in common" + break guardClauses + + return + + info "disconnecting from peer", peerId=peerId, reason=reason + asyncSpawn(pm.switch.disconnect(peerId)) + pm.peerStore.delete(peerId) # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = + if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined: + await pm.onPeerMetadata(peerId) + var direction: PeerDirection var connectedness: Connectedness - if event.kind == PeerEventKind.Joined: - direction = if event.initiator: Outbound else: Inbound - connectedness = Connected - - var clusterOk = false - var reason = "" - # To prevent metadata protocol from breaking prev nodes, by now we only - # disconnect if the clusterid is specified. - if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0: - block wakuMetadata: - var conn: Connection - try: - conn = await pm.switch.dial(peerId, WakuMetadataCodec) - except CatchableError: - reason = "waku metadata codec not supported: " & getCurrentExceptionMsg() - break wakuMetadata - - # request metadata from connecting peer - let metadata = (await pm.wakuMetadata.request(conn)).valueOr: - reason = "failed waku metadata codec request" - break wakuMetadata - - # does not report any clusterId - let clusterId = metadata.clusterId.valueOr: - reason = "empty clusterId reported" - break wakuMetadata - - # drop it if it doesnt match our network id - if pm.wakuMetadata.clusterId != clusterId: - reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId - break wakuMetadata - - # reaching here means the clusterId matches - clusterOk = true - - if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk: - info "disconnecting from peer", peerId=peerId, reason=reason - asyncSpawn(pm.switch.disconnect(peerId)) - pm.peerStore.delete(peerId) - - # TODO: Take action depending on the supported shards as reported by metadata - - let ip = pm.getPeerIp(peerId) - if ip.isSome: - pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) - - let peersBehindIp = pm.ipTable[ip.get] - if peersBehindIp.len > pm.colocationLimit: + case event.kind: + of Joined: + direction = if event.initiator: Outbound else: Inbound + connectedness = Connected + + if (let ip = pm.getPeerIp(peerId); ip.isSome()): + pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) + # in theory this should always be one, but just in case - for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]: + let peersBehindIp = pm.ipTable[ip.get] + + let idx = max((peersBehindIp.len - pm.colocationLimit), 0) + for peerId in peersBehindIp[0..<idx]: debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip asyncSpawn(pm.switch.disconnect(peerId)) pm.peerStore.delete(peerId) - - elif event.kind == PeerEventKind.Left: - direction = UnknownDirection - connectedness = CanConnect - - # note we cant access the peerId ip here as the connection was already closed - for ip, peerIds in pm.ipTable.pairs: - if peerIds.contains(peerId): - pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId) - if pm.ipTable[ip].len == 0: - pm.ipTable.del(ip) - break + of Left: + direction = UnknownDirection + connectedness = CanConnect + + # note we cant access the peerId ip here as the connection was already closed + for ip, peerIds in pm.ipTable.pairs: + if peerIds.contains(peerId): + pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId) + if pm.ipTable[ip].len == 0: + pm.ipTable.del(ip) + break pm.peerStore[ConnectionBook][peerId] = connectedness pm.peerStore[DirectionBook][peerId] = direction @@ -601,9 +611,10 @@ proc connectToNodes*(pm: PeerManager, # later. await sleepAsync(chronos.seconds(5)) -# Returns the peerIds of physical connections (in and out) -# containing at least one stream with the given protocol. proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = + ## Returns the peerIds of physical connections (in and out) + ## containing at least one stream with the given protocol. + var inPeers: seq[PeerId] var outPeers: seq[PeerId] @@ -633,30 +644,88 @@ proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = return (numStreamsIn, numStreamsOut) proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = - let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) + if amount <= 0: + return + + let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec) let connsToPrune = min(amount, inRelayPeers.len) for p in inRelayPeers[0..<connsToPrune]: + trace "Pruning Peer", Peer = $p asyncSpawn(pm.switch.disconnect(p)) -proc connectToRelayPeers*(pm: PeerManager) {.async.} = - let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) - let maxConnections = pm.switch.connManager.inSema.size - let totalRelayPeers = inRelayPeers.len + outRelayPeers.len - let inPeersTarget = maxConnections - pm.outRelayPeersTarget +proc manageRelayPeers*(pm: PeerManager) {.async.} = + if pm.wakuMetadata.shards.len == 0: + return + + var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects + var peersToDisconnect: int + + # Get all connected peers for Waku Relay + var (inPeers, outPeers) = pm.connectedPeers(WakuRelayCodec) - # TODO: Temporally disabled. Might be causing connection issues - #if inRelayPeers.len > pm.inRelayPeersTarget: - # await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget) + # Calculate in/out target number of peers for each shards + let inTarget = pm.inRelayPeersTarget div pm.wakuMetadata.shards.len + let outTarget = pm.outRelayPeersTarget div pm.wakuMetadata.shards.len - if outRelayPeers.len >= pm.outRelayPeersTarget: + for shard in pm.wakuMetadata.shards.items: + # Filter out peer not on this shard + let connectedInPeers = inPeers.filterIt( + pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard))) + + let connectedOutPeers = outPeers.filterIt( + pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard))) + + # Calculate the difference between current values and targets + let inPeerDiff = connectedInPeers.len - inTarget + let outPeerDiff = outTarget - connectedOutPeers.len + + if inPeerDiff > 0: + peersToDisconnect += inPeerDiff + + if outPeerDiff <= 0: + continue + + # Get all peers for this shard + var connectablePeers = pm.peerStore.getPeersByShard( + uint16(pm.wakuMetadata.clusterId), uint16(shard)) + + let shardCount = connectablePeers.len + + connectablePeers.keepItIf( + not pm.peerStore.isConnected(it.peerId) and + pm.canBeConnected(it.peerId)) + + let connectableCount = connectablePeers.len + + connectablePeers.keepItIf(pm.peerStore.hasCapability(it.peerId, Relay)) + + let relayCount = connectablePeers.len + + debug "Sharded Peer Management", + shard = shard, + connectable = $connectableCount & "/" & $shardCount, + relayConnectable = $relayCount & "/" & $shardCount, + relayInboundTarget = $connectedInPeers.len & "/" & $inTarget, + relayOutboundTarget = $connectedOutPeers.len & "/" & $outTarget + + let length = min(outPeerDiff, connectablePeers.len) + for peer in connectablePeers[0..<length]: + trace "Peer To Connect To", peerId = $peer.peerId + peersToConnect.incl(peer.peerId) + + await pm.pruneInRelayConns(peersToDisconnect) + + if peersToConnect.len == 0: return - let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) - let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) - let numPeersToConnect = min(outsideBackoffPeers.len, MaxParalelDials) + let uniquePeers = toSeq(peersToConnect).mapIt(pm.peerStore.get(it)) - await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect]) + # Connect to all nodes + for i in countup(0, uniquePeers.len, MaxParallelDials): + let stop = min(i + MaxParallelDials, uniquePeers.len) + trace "Connecting to Peers", peerIds = $uniquePeers[i..<stop] + await pm.connectToNodes(uniquePeers[i..<stop]) proc prunePeerStore*(pm: PeerManager) = let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len @@ -733,7 +802,7 @@ proc prunePeerStoreLoop(pm: PeerManager) {.async.} = proc relayConnectivityLoop*(pm: PeerManager) {.async.} = debug "Starting relay connectivity loop" while pm.started: - await pm.connectToRelayPeers() + await pm.manageRelayPeers() await sleepAsync(ConnectivityLoopInterval) proc logAndMetrics(pm: PeerManager) {.async.} = @@ -741,7 +810,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} = # log metrics let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) let maxConnections = pm.switch.connManager.inSema.size - let totalRelayPeers = inRelayPeers.len + outRelayPeers.len let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) let totalConnections = pm.switch.connManager.getConnections().len @@ -769,4 +837,4 @@ proc start*(pm: PeerManager) = asyncSpawn pm.logAndMetrics() proc stop*(pm: PeerManager) = - pm.started = false + pm.started = false \ No newline at end of file diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index 03a6f6e3cc..579ae395a8 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[tables, sequtils, sets, options, times, strutils], + std/[tables, sequtils, sets, options, strutils], chronos, eth/p2p/discoveryv5/enr, libp2p/builders, @@ -12,6 +12,8 @@ import import ../../waku_core, + ../../waku_enr/sharding, + ../../waku_enr/capabilities, ../../common/utils/sequence export peerstore, builders @@ -95,10 +97,13 @@ proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it))) proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness = - # Return the connection state of the given, managed peer - # TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. - # TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts - return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) + peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) + +proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool = + peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) + +proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool = + peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap) proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool = # Returns `true` if the peer is connected @@ -131,3 +136,9 @@ proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInf proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected) + +proc getPeersByShard*(peerStore: PeerStore, cluster, shard: uint16): seq[RemotePeerInfo] = + return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().containsShard(cluster, shard)) + +proc getPeersByCapability*(peerStore: PeerStore, cap: Capabilities): seq[RemotePeerInfo] = + return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap)) \ No newline at end of file diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 82f72f8e5a..9a4b74d310 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -35,7 +35,8 @@ import ../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed ../waku_filter_v2, ../waku_filter_v2/client as filter_client, - ../waku_lightpush, + ../waku_lightpush/common, + ../waku_lightpush/protocol, ../waku_metadata, ../waku_lightpush/client as lightpush_client, ../waku_enr, diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index bf4b5ebed4..d8c085065d 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -17,7 +17,7 @@ import ../../../waku_store, ../../../waku_filter, ../../../waku_filter_v2, - ../../../waku_lightpush, + ../../../waku_lightpush/common, ../../../waku_relay, ../../../waku_node, ../../../node/peer_manager, diff --git a/waku/waku_api/rest/lightpush/handlers.nim b/waku/waku_api/rest/lightpush/handlers.nim index df0ae9869c..20a6a1b1ab 100644 --- a/waku/waku_api/rest/lightpush/handlers.nim +++ b/waku/waku_api/rest/lightpush/handlers.nim @@ -5,7 +5,6 @@ else: import std/strformat, - std/sequtils, stew/byteutils, chronicles, json_serialization, @@ -14,10 +13,9 @@ import presto/common import - ../../../waku_core, ../../waku/node/peer_manager, ../../../waku_node, - ../../waku/waku_lightpush, + ../../waku/waku_lightpush/common, ../../handlers, ../serdes, ../responses, diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index bb918b3681..f7d4fae59b 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -84,7 +84,6 @@ proc init*(T: typedesc[RemotePeerInfo], let peerId = PeerID.init(peerId).tryGet() RemotePeerInfo(peerId: peerId, addrs: addrs, enr: enr, protocols: protocols) - ## Parse proc validWireAddr*(ma: MultiAddress): bool = @@ -217,11 +216,15 @@ converter toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo = converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo = ## Converts the local peerInfo to dialable RemotePeerInfo ## Useful for testing or internal connections - RemotePeerInfo.init( - peerInfo.peerId, - peerInfo.listenAddrs, - none(enr.Record), - peerInfo.protocols + RemotePeerInfo( + peerId: peerInfo.peerId, + addrs: peerInfo.listenAddrs, + enr: none(Record), + protocols: peerInfo.protocols, + + agent: peerInfo.agentVersion, + protoVersion: peerInfo.protoVersion, + publicKey: peerInfo.publicKey, ) proc hasProtocol*(ma: MultiAddress, proto: string): bool = diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 2f90b6fb32..9cda80e248 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -14,7 +14,7 @@ import ../node/peer_manager, ../utils/requests, ../waku_core, - ./protocol, + ./common, ./protocol_metrics, ./rpc, ./rpc_codec diff --git a/waku/waku_lightpush/common.nim b/waku/waku_lightpush/common.nim new file mode 100644 index 0000000000..27ecb97466 --- /dev/null +++ b/waku/waku_lightpush/common.nim @@ -0,0 +1,21 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + stew/results, + chronos, + libp2p/peerid +import + ../waku_core + +const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" + +type WakuLightPushResult*[T] = Result[T, string] + +type PushMessageHandler* = proc( + peer: PeerId, + pubsubTopic: PubsubTopic, + message: WakuMessage + ): Future[WakuLightPushResult[void]] {.gcsafe, closure.} diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 615c2fb69c..56d9ff87ee 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -11,26 +11,17 @@ import metrics, bearssl/rand import - ../node/peer_manager, + ../node/peer_manager/peer_manager, ../waku_core, + ./common, ./rpc, ./rpc_codec, ./protocol_metrics - - + logScope: topics = "waku lightpush" - -const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" - - -type - WakuLightPushResult*[T] = Result[T, string] - - PushMessageHandler* = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.gcsafe, closure.} - - WakuLightPush* = ref object of LPProtocol +type WakuLightPush* = ref object of LPProtocol rng*: ref rand.HmacDrbgContext peerManager*: PeerManager pushHandler*: PushMessageHandler diff --git a/waku/waku_metadata/protocol.nim b/waku/waku_metadata/protocol.nim index 78573650a3..1fbe1e2717 100644 --- a/waku/waku_metadata/protocol.nim +++ b/waku/waku_metadata/protocol.nim @@ -112,7 +112,7 @@ proc new*(T: type WakuMetadata, wm.initProtocolHandler() - info "Created WakuMetadata protocol", clusterId=cluster + info "Created WakuMetadata protocol", clusterId=wm.clusterId, shards=wm.shards return wm