Skip to content

Commit

Permalink
Feat/reduce churn (#668)
Browse files Browse the repository at this point in the history
* reduce churn
  • Loading branch information
aarshkshah1992 authored and aschmahmann committed Aug 14, 2020
1 parent fc3558c commit f580bda
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 25 deletions.
108 changes: 96 additions & 12 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"

Expand All @@ -40,6 +41,8 @@ import (
var (
logger = logging.Logger("dht")
baseLogger = logger.Desugar()

rtFreezeTimeout = 1 * time.Minute
)

const (
Expand All @@ -66,6 +69,11 @@ const (
protectedBuckets = 2
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -115,6 +123,7 @@ type IpfsDHT struct {

queryPeerFilter QueryFilterFunc
routingTablePeerFilter RouteTableFilterFunc
rtPeerDiversityFilter peerdiversity.PeerIPGroupFilter

autoRefresh bool

Expand All @@ -131,6 +140,11 @@ type IpfsDHT struct {
enableProviders, enableValues bool

fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -207,7 +221,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
go dht.persistRTPeersInPeerStore()

// listens to the fix low peers chan and tries to fix the Routing Table
dht.proc.Go(dht.fixLowPeersRoutine)
if !cfg.disableFixLowPeers {
dht.proc.Go(dht.fixLowPeersRoutine)
}

dht.proc.Go(dht.rtPeerLoop)

return dht, nil
}
Expand Down Expand Up @@ -271,7 +289,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
beta: cfg.resiliency,
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}, 1),
rtPeerDiversityFilter: cfg.routingTable.diversityFilter,

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
refreshFinishedCh: make(chan struct{}),
}

var maxLastSuccessfulOutboundThreshold time.Duration
Expand Down Expand Up @@ -320,6 +343,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}
dht.ProviderManager = pm

dht.rtFreezeTimeout = rtFreezeTimeout

return dht, nil
}

Expand All @@ -340,13 +365,28 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
maxLastSuccessfulOutboundThreshold)
maxLastSuccessfulOutboundThreshold,
dht.refreshFinishedCh)

return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
// make a Routing Table Diversity Filter
var filter *peerdiversity.Filter
if dht.rtPeerDiversityFilter != nil {
df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int {
return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
})

if err != nil {
return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err)
}

filter = df
}

rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -368,6 +408,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho
return rt, err
}

// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (d *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
return d.routingTable.GetDiversityStats()
}

// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
Expand Down Expand Up @@ -554,6 +599,50 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
return dht.datastore.Put(mkDsKey(key), data)
}

func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
bootstrapCount := 0
isBootsrapping := false
var timerCh <-chan time.Time

for {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
if bootstrapCount == 2 {
timerCh = time.NewTimer(dht.rtFreezeTimeout).C
}

old := isBootsrapping
isBootsrapping = false
if old {
dht.rtRefreshManager.RefreshNoWait()
}

case <-proc.Closing():
return
}
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
Expand All @@ -575,16 +664,11 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
if err != nil {
// peer not added.
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():
return
}
if !newlyAdded && queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
}
}

Expand Down
131 changes: 131 additions & 0 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,134 @@ func TestDefaultBootstrappers(t *testing.T) {
}
require.Empty(t, dfmap)
}

func TestBootstrappersReplacable(t *testing.T) {
old := rtFreezeTimeout
rtFreezeTimeout = 100 * time.Millisecond
defer func() {
rtFreezeTimeout = old
}()
ctx := context.Background()
d := setupDHT(ctx, t, false, disableFixLowPeersRoutine(t), BucketSize(2))
defer d.host.Close()
defer d.Close()

var d1 *IpfsDHT
var d2 *IpfsDHT

// d1 & d2 have a cpl of 0
for {
d1 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d1.selfKey) == 0 {
break
}
}

for {
d2 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d2.selfKey) == 0 {
break
}
}
defer d1.host.Close()
defer d1.Close()

defer d2.host.Close()
defer d2.Close()

connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)

// d3 & d4 with cpl=0 will go in as d1 & d2 are replacable.
var d3 *IpfsDHT
var d4 *IpfsDHT

for {
d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d3.selfKey) == 0 {
break
}
}

for {
d4 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d4.selfKey) == 0 {
break
}
}

defer d3.host.Close()
defer d3.Close()
defer d4.host.Close()
defer d4.Close()

connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// do couple of refreshes and wait for the Routing Table to be "frozen".
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)

// adding d5 fails because RT is frozen
var d5 *IpfsDHT
for {
d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 {
break
}
}
defer d5.host.Close()
defer d5.Close()

connectNoSync(t, ctx, d, d5)
time.Sleep(500 * time.Millisecond)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// Let's empty the routing table
for _, p := range d.routingTable.ListPeers() {
d.routingTable.RemovePeer(p)
}
require.Len(t, d.routingTable.ListPeers(), 0)

// adding d1 & d2 works now because there is space in the Routing Table
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d2.self))
connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d1.self)
require.Contains(t, d.routingTable.ListPeers(), d2.self)

// adding d3 & d4 also works because the RT is not frozen.
require.NoError(t, d.host.Network().ClosePeer(d3.self))
require.NoError(t, d.host.Network().ClosePeer(d4.self))
connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)

// run refreshes and freeze the RT
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)
// cant add d1 & d5 because RT is frozen.
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)
}
35 changes: 30 additions & 5 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package dht

import (
"fmt"
"testing"
"time"

ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipns"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"

"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"

ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipns"
)

// ModeOpt describes what mode the dht should operate in
Expand Down Expand Up @@ -56,11 +60,13 @@ type config struct {
latencyTolerance time.Duration
checkInterval time.Duration
peerFilter RouteTableFilterFunc
diversityFilter peerdiversity.PeerIPGroupFilter
}

// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
disableFixLowPeers bool
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -403,3 +409,22 @@ func BootstrapPeers(bootstrappers ...peer.AddrInfo) Option {
return nil
}
}

// RoutingTablePeerDiversityFilter configures the implementation of the `PeerIPGroupFilter` that will be used
// to construct the diversity filter for the Routing Table.
// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details.
func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option {
return func(c *config) error {
c.routingTable.diversityFilter = pg
return nil
}
}

// disableFixLowPeersRoutine disables the "fixLowPeers" routine in the DHT.
// This is ONLY for tests.
func disableFixLowPeersRoutine(t *testing.T) Option {
return func(c *config) error {
c.disableFixLowPeers = true
return nil
}
}
Loading

0 comments on commit f580bda

Please sign in to comment.