Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/reduce churn #668

Merged
merged 5 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
var (
logger = logging.Logger("dht")
baseLogger = logger.Desugar()

rtFreezeTimeout = 1 * time.Minute
)

const (
Expand Down Expand Up @@ -78,6 +80,11 @@ const (
kbucketTag = "kbucket"
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -144,6 +151,11 @@ type IpfsDHT struct {
enableProviders, enableValues bool

fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -220,7 +232,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
go dht.persistRTPeersInPeerStore()

// listens to the fix low peers chan and tries to fix the Routing Table
dht.proc.Go(dht.fixLowPeersRoutine)
if !cfg.disableFixLowPeers {
dht.proc.Go(dht.fixLowPeersRoutine)
}

dht.proc.Go(dht.rtPeerLoop)

return dht, nil
}
Expand Down Expand Up @@ -288,6 +304,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
rtPeerDiversityFilter: cfg.routingTable.diversityFilter,

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
refreshFinishedCh: make(chan struct{}),
}

var maxLastSuccessfulOutboundThreshold time.Duration
Expand Down Expand Up @@ -336,6 +355,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}
dht.ProviderManager = pm

dht.rtFreezeTimeout = rtFreezeTimeout

return dht, nil
}

Expand All @@ -356,7 +377,8 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
maxLastSuccessfulOutboundThreshold)
maxLastSuccessfulOutboundThreshold,
dht.refreshFinishedCh)

return r, err
}
Expand Down Expand Up @@ -591,6 +613,50 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
return dht.datastore.Put(mkDsKey(key), data)
}

func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
bootstrapCount := 0
isBootsrapping := false
var timerCh <-chan time.Time

for {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
if bootstrapCount == 2 {
timerCh = time.NewTimer(dht.rtFreezeTimeout).C
}

old := isBootsrapping
isBootsrapping = false
if old {
dht.rtRefreshManager.RefreshNoWait()
}

case <-proc.Closing():
return
}
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
Expand All @@ -612,16 +678,11 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
if err != nil {
// peer not added.
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():
return
}
if !newlyAdded && queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
}
}

Expand Down
131 changes: 131 additions & 0 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,134 @@ func TestDefaultBootstrappers(t *testing.T) {
}
require.Empty(t, dfmap)
}

func TestBootstrappersReplacable(t *testing.T) {
old := rtFreezeTimeout
rtFreezeTimeout = 100 * time.Millisecond
defer func() {
rtFreezeTimeout = old
}()
ctx := context.Background()
d := setupDHT(ctx, t, false, disableFixLowPeersRoutine(t), BucketSize(2))
defer d.host.Close()
defer d.Close()

var d1 *IpfsDHT
var d2 *IpfsDHT

// d1 & d2 have a cpl of 0
for {
d1 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d1.selfKey) == 0 {
break
}
}

for {
d2 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d2.selfKey) == 0 {
break
}
}
defer d1.host.Close()
defer d1.Close()

defer d2.host.Close()
defer d2.Close()

connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)

// d3 & d4 with cpl=0 will go in as d1 & d2 are replacable.
var d3 *IpfsDHT
var d4 *IpfsDHT

for {
d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d3.selfKey) == 0 {
break
}
}

for {
d4 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d4.selfKey) == 0 {
break
}
}

defer d3.host.Close()
defer d3.Close()
defer d4.host.Close()
defer d4.Close()

connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// do couple of refreshes and wait for the Routing Table to be "frozen".
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)

// adding d5 fails because RT is frozen
var d5 *IpfsDHT
for {
d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 {
break
}
}
defer d5.host.Close()
defer d5.Close()

connectNoSync(t, ctx, d, d5)
time.Sleep(500 * time.Millisecond)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// Let's empty the routing table
for _, p := range d.routingTable.ListPeers() {
d.routingTable.RemovePeer(p)
}
require.Len(t, d.routingTable.ListPeers(), 0)

// adding d1 & d2 works now because there is space in the Routing Table
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d2.self))
connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d1.self)
require.Contains(t, d.routingTable.ListPeers(), d2.self)

// adding d3 & d4 also works because the RT is not frozen.
require.NoError(t, d.host.Network().ClosePeer(d3.self))
require.NoError(t, d.host.Network().ClosePeer(d4.self))
connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// run refreshes and freeze the RT
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)
// cant add d1 & d5 because RT is frozen.
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)
}
15 changes: 13 additions & 2 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"fmt"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -63,8 +64,9 @@ type config struct {
}

// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
disableFixLowPeers bool
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -417,3 +419,12 @@ func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option
return nil
}
}

// disableFixLowPeersRoutine disables the "fixLowPeers" routine in the DHT.
// This is ONLY for tests.
func disableFixLowPeersRoutine(t *testing.T) Option {
return func(c *config) error {
c.disableFixLowPeers = true
return nil
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.8.2
github.com/libp2p/go-libp2p-core v0.5.4
github.com/libp2p/go-libp2p-kbucket v0.4.3
github.com/libp2p/go-libp2p-kbucket v0.4.4
github.com/libp2p/go-libp2p-peerstore v0.2.4
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200603162158-145d6af2e842 h1:Co5A
github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200603162158-145d6af2e842/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is=
github.com/libp2p/go-libp2p-kbucket v0.4.3 h1:6SWZ52TWpAUwg8vk8r9ApwYsnPhN67kBwvQhdnXF8KQ=
github.com/libp2p/go-libp2p-kbucket v0.4.3/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is=
github.com/libp2p/go-libp2p-kbucket v0.4.4-0.20200603184121-1bd4943d8154 h1:2JwJomxuu+OvjAhzSzUm/RHTeIN6tlNXWw4NyMSdD5Q=
github.com/libp2p/go-libp2p-kbucket v0.4.4-0.20200603184121-1bd4943d8154/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is=
github.com/libp2p/go-libp2p-kbucket v0.4.4-0.20200604064949-a1d5ae565be4 h1:75ebf31AFS2kXssuZEXdgQUkUUYGtma4AHexsrr3WBk=
github.com/libp2p/go-libp2p-kbucket v0.4.4-0.20200604064949-a1d5ae565be4/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is=
github.com/libp2p/go-libp2p-kbucket v0.4.4 h1:XVYbmaO3lnscFxnrVO/bTMQx8HPRMgJuTFlgVv6Hqz4=
github.com/libp2p/go-libp2p-kbucket v0.4.4/go.mod h1:/PMj5dxV7yebkcXg7SD3OtXFdMNeqtS6UnByTRu1+Is=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
Expand Down
12 changes: 11 additions & 1 deletion rtrefresh/rt_refresh_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ type RtRefreshManager struct {
successfulOutboundQueryGracePeriod time.Duration

triggerRefresh chan *triggerRefreshReq // channel to write refresh requests to.

refreshDoneCh chan struct{} // write to this channel after every refresh
}

func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
refreshKeyGenFnc func(cpl uint) (string, error),
refreshQueryFnc func(ctx context.Context, key string) error,
refreshQueryTimeout time.Duration,
refreshInterval time.Duration,
successfulOutboundQueryGracePeriod time.Duration) (*RtRefreshManager, error) {
successfulOutboundQueryGracePeriod time.Duration,
refreshDoneCh chan struct{}) (*RtRefreshManager, error) {

ctx, cancel := context.WithCancel(context.Background())
return &RtRefreshManager{
Expand All @@ -75,6 +78,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool
successfulOutboundQueryGracePeriod: successfulOutboundQueryGracePeriod,

triggerRefresh: make(chan *triggerRefreshReq),
refreshDoneCh: refreshDoneCh,
}, nil
}

Expand Down Expand Up @@ -235,6 +239,12 @@ func (r *RtRefreshManager) doRefresh(forceRefresh bool) error {
}
}

select {
case r.refreshDoneCh <- struct{}{}:
case <-r.ctx.Done():
return r.ctx.Err()
}

return merr
}

Expand Down
Loading