Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shard aware peer store pruning #2167

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 61 additions & 24 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ else:


import
std/[options, sugar, sets, sequtils, times, strutils, math],
std/[options, sets, tables, sequtils, times, strutils, math],
chronos,
chronicles,
metrics,
Expand All @@ -14,13 +14,11 @@ import
libp2p/nameresolving/nameresolver
import
../../common/nimchronos,
../../common/enr,
../../waku_core,
../../waku_relay,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../waku_store/common,
../../waku_filter_v2/common,
../../waku_lightpush/common,
../../waku_metadata,
./peer_store/peer_storage,
./waku_peer_store
Expand Down Expand Up @@ -728,38 +726,77 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
await pm.connectToNodes(uniquePeers[i..<stop])

proc prunePeerStore*(pm: PeerManager) =
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
let numPeers = pm.peerStore[AddressBook].book.len
let capacity = pm.peerStore.capacity
if numPeers < capacity:
if numPeers <= capacity:
return

debug "Peer store capacity exceeded", numPeers = numPeers, capacity = capacity
let peersToPrune = numPeers - capacity

# prune peers with too many failed attempts
var pruned = 0
# copy to avoid modifying the book while iterating
let peerKeys = toSeq(pm.peerStore[NumberFailedConnBook].book.keys)
for peerId in peerKeys:
if peersToPrune - pruned == 0:
let pruningCount = numPeers - capacity
var peersToPrune: HashSet[PeerId]

# prune failed connections
for peerId, count in pm.peerStore[NumberFailedConnBook].book.pairs:
if count < pm.maxFailedAttempts:
continue

if peersToPrune.len >= pruningCount:
break
if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts:
pm.peerStore.del(peerId)
pruned += 1

# if we still need to prune, prune peers that are not connected
peersToPrune.incl(peerId)

let notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId)
for peerId in notConnected:
if peersToPrune - pruned == 0:

var shardlessPeers: seq[PeerId]
var peersByShard = initTable[uint16, seq[PeerId]]()

for peer in notConnected:
if not pm.peerStore[ENRBook].contains(peer):
shardlessPeers.add(peer)
continue

let record = pm.peerStore[ENRBook][peer]

let rec = record.toTyped().valueOr:
shardlessPeers.add(peer)
continue

let rs = rec.relaySharding().valueOr:
shardlessPeers.add(peer)
continue

for shard in rs.shardIds:
peersByShard.mgetOrPut(shard, @[peer]).add(peer)

# prune not connected peers without shard
for peer in shardlessPeers:
if peersToPrune.len >= pruningCount:
break
pm.peerStore.del(peerId)
pruned += 1

let afterNumPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
peersToPrune.incl(peer)

# calculate the avg peers per shard
let total = sum(toSeq(peersByShard.values).mapIt(it.len))
let avg = min(1, total div max(1, peersByShard.len))

# prune peers from shard with higher than avg count
for shard, peers in peersByShard.pairs:
let count = max(peers.len - avg, 0)
for peer in peers[0..count]:
if peersToPrune.len >= pruningCount:
break

peersToPrune.incl(peer)
Comment on lines +785 to +789
Copy link
Contributor

@jm-clius jm-clius Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to remember is that these peers more than likely also support other shards that we may be interested in, so removing them here also removes them from the other supported shards' slots. Not sure if it's a good idea to make this more complicated in this PR, but a possible issue I foresee is that peers that support more shards (good!) are more likely to be pruned overall. Perhaps a future improvement is to sort the peers here by number of supported shards and prune the peers first that support fewer/no other shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to remember is that these peers more than likely also support other shards that we may be interested in, so removing them here also removes them from the other supported shards' slots. Not sure if it's a good idea to make this more complicated in this PR, but a possible issue I foresee is that peers that support more shards (good!) are more likely to be pruned overall. Perhaps a future improvement is to sort the peers here by number of supported shards and prune the peers first that support fewer/no other shards?

On the flip side, peers that support less shard might be more decentralized (not running in data center with beefy hardware).

Peer that support all shards == central server (exaggeration but still). We have to be careful about what we incentivize.


for peer in peersToPrune:
pm.peerStore.delete(peer)

let afterNumPeers = pm.peerStore[AddressBook].book.len

debug "Finished pruning peer store", beforeNumPeers = numPeers,
afterNumPeers = afterNumPeers,
capacity = capacity,
pruned = pruned
pruned = peersToPrune.len

proc selectPeer*(pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)): Option[RemotePeerInfo] =
debug "Selecting peer from peerstore", protocol=proto
Expand Down