Skip to content

Commit

Permalink
Feat/reduce churn (#668)
Browse files Browse the repository at this point in the history
* reduce churn
  • Loading branch information
aarshkshah1992 authored and aschmahmann committed Aug 13, 2020
1 parent 8e56690 commit 7712d13
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 38 deletions.
81 changes: 71 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
var (
logger = logging.Logger("dht")
baseLogger = logger.Desugar()

rtFreezeTimeout = 1 * time.Minute
)

const (
Expand All @@ -67,6 +69,11 @@ const (
protectedBuckets = 2
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -133,6 +140,11 @@ type IpfsDHT struct {
enableProviders, enableValues bool

fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -209,7 +221,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
go dht.persistRTPeersInPeerStore()

// listens to the fix low peers chan and tries to fix the Routing Table
dht.proc.Go(dht.fixLowPeersRoutine)
if !cfg.disableFixLowPeers {
dht.proc.Go(dht.fixLowPeersRoutine)
}

dht.proc.Go(dht.rtPeerLoop)

return dht, nil
}
Expand Down Expand Up @@ -277,6 +293,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
rtPeerDiversityFilter: cfg.routingTable.diversityFilter,

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
refreshFinishedCh: make(chan struct{}),
}

var maxLastSuccessfulOutboundThreshold time.Duration
Expand Down Expand Up @@ -325,6 +344,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}
dht.ProviderManager = pm

dht.rtFreezeTimeout = rtFreezeTimeout

return dht, nil
}

Expand All @@ -345,7 +366,8 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
maxLastSuccessfulOutboundThreshold)
maxLastSuccessfulOutboundThreshold,
dht.refreshFinishedCh)

return r, err
}
Expand Down Expand Up @@ -578,6 +600,50 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
return dht.datastore.Put(mkDsKey(key), data)
}

func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
bootstrapCount := 0
isBootsrapping := false
var timerCh <-chan time.Time

for {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
if bootstrapCount == 2 {
timerCh = time.NewTimer(dht.rtFreezeTimeout).C
}

old := isBootsrapping
isBootsrapping = false
if old {
dht.rtRefreshManager.RefreshNoWait()
}

case <-proc.Closing():
return
}
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
Expand All @@ -599,16 +665,11 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
if err != nil {
// peer not added.
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():
return
}
if !newlyAdded && queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
}
}

Expand Down
131 changes: 131 additions & 0 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,134 @@ func TestDefaultBootstrappers(t *testing.T) {
}
require.Empty(t, dfmap)
}

func TestBootstrappersReplacable(t *testing.T) {
old := rtFreezeTimeout
rtFreezeTimeout = 100 * time.Millisecond
defer func() {
rtFreezeTimeout = old
}()
ctx := context.Background()
d := setupDHT(ctx, t, false, disableFixLowPeersRoutine(t), BucketSize(2))
defer d.host.Close()
defer d.Close()

var d1 *IpfsDHT
var d2 *IpfsDHT

// d1 & d2 have a cpl of 0
for {
d1 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d1.selfKey) == 0 {
break
}
}

for {
d2 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d2.selfKey) == 0 {
break
}
}
defer d1.host.Close()
defer d1.Close()

defer d2.host.Close()
defer d2.Close()

connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)

// d3 & d4 with cpl=0 will go in as d1 & d2 are replacable.
var d3 *IpfsDHT
var d4 *IpfsDHT

for {
d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d3.selfKey) == 0 {
break
}
}

for {
d4 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d4.selfKey) == 0 {
break
}
}

defer d3.host.Close()
defer d3.Close()
defer d4.host.Close()
defer d4.Close()

connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// do couple of refreshes and wait for the Routing Table to be "frozen".
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)

// adding d5 fails because RT is frozen
var d5 *IpfsDHT
for {
d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 {
break
}
}
defer d5.host.Close()
defer d5.Close()

connectNoSync(t, ctx, d, d5)
time.Sleep(500 * time.Millisecond)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// Let's empty the routing table
for _, p := range d.routingTable.ListPeers() {
d.routingTable.RemovePeer(p)
}
require.Len(t, d.routingTable.ListPeers(), 0)

// adding d1 & d2 works now because there is space in the Routing Table
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d2.self))
connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d1.self)
require.Contains(t, d.routingTable.ListPeers(), d2.self)

// adding d3 & d4 also works because the RT is not frozen.
require.NoError(t, d.host.Network().ClosePeer(d3.self))
require.NoError(t, d.host.Network().ClosePeer(d4.self))
connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// run refreshes and freeze the RT
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)
// cant add d1 & d5 because RT is frozen.
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)
}
15 changes: 13 additions & 2 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"fmt"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -63,8 +64,9 @@ type config struct {
}

// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
disableFixLowPeers bool
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -417,3 +419,12 @@ func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option
return nil
}
}

// disableFixLowPeersRoutine disables the "fixLowPeers" routine in the DHT.
// This is ONLY for tests.
func disableFixLowPeersRoutine(t *testing.T) Option {
return func(c *config) error {
c.disableFixLowPeers = true
return nil
}
}
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ require (
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipns v0.0.2
github.com/ipfs/go-log v1.0.4
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.8.2
github.com/libp2p/go-libp2p-core v0.5.4
github.com/libp2p/go-libp2p-kbucket v0.4.3
github.com/libp2p/go-libp2p-kbucket v0.4.6
github.com/libp2p/go-libp2p-peerstore v0.2.4
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
Expand All @@ -33,7 +33,6 @@ require (
github.com/multiformats/go-multistream v0.1.1
github.com/stretchr/testify v1.5.1
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1
github.com/yl2chen/cidranger v1.0.0
go.opencensus.io v0.22.3
go.uber.org/zap v1.14.1
)
Loading

0 comments on commit 7712d13

Please sign in to comment.