-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce Routing Table churn #90
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,39 +1,77 @@ | ||
package kbucket | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/test" | ||
|
||
"github.com/stretchr/testify/require" | ||
"testing" | ||
) | ||
|
||
func TestBucketMinimum(t *testing.T) { | ||
func TestBucketFindFirst(t *testing.T) { | ||
t.Parallel() | ||
|
||
b := newBucket() | ||
require.Nil(t, b.min(func(p1 *PeerInfo, p2 *PeerInfo) bool { return true })) | ||
require.Nil(t, b.findFirst(func(p1 *PeerInfo) bool { return true })) | ||
|
||
pid1 := test.RandPeerIDFatal(t) | ||
pid2 := test.RandPeerIDFatal(t) | ||
pid3 := test.RandPeerIDFatal(t) | ||
|
||
// first is min | ||
b.pushFront(&PeerInfo{Id: pid1, LastUsefulAt: time.Now()}) | ||
require.Equal(t, pid1, b.min(func(first *PeerInfo, second *PeerInfo) bool { | ||
return first.LastUsefulAt.Before(second.LastUsefulAt) | ||
// first is replacable | ||
b.pushFront(&PeerInfo{Id: pid1, replaceable: true}) | ||
require.Equal(t, pid1, b.findFirst(func(p *PeerInfo) bool { | ||
return p.replaceable | ||
}).Id) | ||
|
||
// first is till min | ||
b.pushFront(&PeerInfo{Id: pid2, LastUsefulAt: time.Now().AddDate(1, 0, 0)}) | ||
require.Equal(t, pid1, b.min(func(first *PeerInfo, second *PeerInfo) bool { | ||
return first.LastUsefulAt.Before(second.LastUsefulAt) | ||
// above peer is stll the replacable one | ||
b.pushFront(&PeerInfo{Id: pid2, replaceable: false}) | ||
require.Equal(t, pid1, b.findFirst(func(p *PeerInfo) bool { | ||
return p.replaceable | ||
}).Id) | ||
|
||
// second is the min | ||
b.pushFront(&PeerInfo{Id: pid3, LastUsefulAt: time.Now().AddDate(-1, 0, 0)}) | ||
require.Equal(t, pid3, b.min(func(first *PeerInfo, second *PeerInfo) bool { | ||
return first.LastUsefulAt.Before(second.LastUsefulAt) | ||
// new peer is replacable. | ||
b.pushFront(&PeerInfo{Id: pid3, replaceable: true}) | ||
require.Equal(t, pid3, b.findFirst(func(p *PeerInfo) bool { | ||
return p.replaceable | ||
}).Id) | ||
} | ||
|
||
func TestUpdateAllWith(t *testing.T) { | ||
t.Parallel() | ||
|
||
b := newBucket() | ||
// dont crash | ||
b.updateAllWith(func(p *PeerInfo) {}) | ||
|
||
pid1 := test.RandPeerIDFatal(t) | ||
pid2 := test.RandPeerIDFatal(t) | ||
pid3 := test.RandPeerIDFatal(t) | ||
|
||
// peer1 | ||
b.pushFront(&PeerInfo{Id: pid1, replaceable: false}) | ||
b.updateAllWith(func(p *PeerInfo) { | ||
p.replaceable = true | ||
}) | ||
require.True(t, b.getPeer(pid1).replaceable) | ||
|
||
// peer2 | ||
b.pushFront(&PeerInfo{Id: pid2, replaceable: false}) | ||
b.updateAllWith(func(p *PeerInfo) { | ||
if p.Id == pid1 { | ||
p.replaceable = false | ||
} else { | ||
p.replaceable = true | ||
} | ||
}) | ||
require.True(t, b.getPeer(pid2).replaceable) | ||
require.False(t, b.getPeer(pid1).replaceable) | ||
|
||
// peer3 | ||
b.pushFront(&PeerInfo{Id: pid3, replaceable: false}) | ||
require.False(t, b.getPeer(pid3).replaceable) | ||
b.updateAllWith(func(p *PeerInfo) { | ||
p.replaceable = true | ||
}) | ||
require.True(t, b.getPeer(pid1).replaceable) | ||
require.True(t, b.getPeer(pid2).replaceable) | ||
require.True(t, b.getPeer(pid3).replaceable) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -132,15 +132,15 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int { | |
// the boolean value will ALWAYS be false i.e. the peer wont be added to the Routing Table it it's not already there. | ||
// | ||
// A return value of false with error=nil indicates that the peer ALREADY exists in the Routing Table. | ||
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) { | ||
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) { | ||
rt.tabLock.Lock() | ||
defer rt.tabLock.Unlock() | ||
|
||
return rt.addPeer(p, queryPeer) | ||
return rt.addPeer(p, queryPeer, isReplaceable) | ||
} | ||
|
||
// locking is the responsibility of the caller | ||
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { | ||
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) { | ||
bucketID := rt.bucketIdForPeer(p) | ||
bucket := rt.buckets[bucketID] | ||
|
||
|
@@ -183,6 +183,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { | |
LastSuccessfulOutboundQueryAt: now, | ||
AddedAt: now, | ||
dhtId: ConvertPeerID(p), | ||
replaceable: isReplaceable, | ||
}) | ||
rt.PeerAdded(p) | ||
return true, nil | ||
|
@@ -203,27 +204,29 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { | |
LastSuccessfulOutboundQueryAt: now, | ||
AddedAt: now, | ||
dhtId: ConvertPeerID(p), | ||
replaceable: isReplaceable, | ||
}) | ||
rt.PeerAdded(p) | ||
return true, nil | ||
} | ||
} | ||
|
||
// the bucket to which the peer belongs is full. Let's try to find a peer | ||
// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it. | ||
minLast := bucket.min(func(first *PeerInfo, second *PeerInfo) bool { | ||
return first.LastUsefulAt.Before(second.LastUsefulAt) | ||
// in that bucket which is replaceable. | ||
replaceablePeer := bucket.findFirst(func(p *PeerInfo) bool { | ||
return p.replaceable | ||
}) | ||
|
||
if time.Since(minLast.LastUsefulAt) > rt.usefulnessGracePeriod { | ||
if replaceablePeer != nil { | ||
// let's evict it and add the new peer | ||
if rt.removePeer(minLast.Id) { | ||
if rt.removePeer(replaceablePeer.Id) { | ||
bucket.pushFront(&PeerInfo{ | ||
Id: p, | ||
LastUsefulAt: lastUsefulAt, | ||
LastSuccessfulOutboundQueryAt: now, | ||
AddedAt: now, | ||
dhtId: ConvertPeerID(p), | ||
replaceable: isReplaceable, | ||
}) | ||
rt.PeerAdded(p) | ||
return true, nil | ||
|
@@ -237,6 +240,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { | |
return false, ErrPeerRejectedNoCapacity | ||
} | ||
|
||
// MarkAllPeersIrreplaceable marks all peers in the routing table as irreplaceable | ||
// This means that we will never replace an existing peer in the table to make space for a new peer. | ||
// However, they can still be removed by calling the `RemovePeer` API. | ||
func (rt *RoutingTable) MarkAllPeersIrreplaceable() { | ||
Comment on lines
+245
to
+248
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels like a function just waiting to get deprecated, as opposed to basically exposing the
Comment on lines
+245
to
+248
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to see how this is used in the DHT PR to ensure we don't run into race conditions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The PR is up and I think we can keep this for now. Let me know what you think. |
||
rt.tabLock.Lock() | ||
defer rt.tabLock.Unlock() | ||
|
||
for i := range rt.buckets { | ||
b := rt.buckets[i] | ||
b.updateAllWith(func(p *PeerInfo) { | ||
p.replaceable = false | ||
}) | ||
} | ||
} | ||
|
||
// GetPeerInfos returns the peer information that we've stored in the buckets | ||
func (rt *RoutingTable) GetPeerInfos() []PeerInfo { | ||
rt.tabLock.RLock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd still use a comparison function like
min
that determines which peer is more "evictable" than another peer. Then we go through the bucket and find the most evictable peer and check if it is actually evictable.Previously: Find oldest time, then check if time is too old
This PR: Find first replaceable
Suggestion: comparison function
if p1.replaceable return p1; if p2.replaceable return p2; return p1
. Evaluation functionreturn p.replaceable
. It's really easy to plug in the older code here if required.Note: if we wanted to enable stable sorting we would have to compare peerIDs if both results were equal (e.g. added at the same time or both replaceable), but since we just need one of them it shouldn't really matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made this change. Let me know what you think. And I agree, we don't need a stable sort for now.