Skip to content

Commit

Permalink
Add test to confirm garbage collection
Browse files Browse the repository at this point in the history
- Create a test to verify that a node that joins
  in an async way is not going to extend the life
  of a already deleted object

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Oct 12, 2017
1 parent 659480c commit 540b97f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 37 deletions.
13 changes: 4 additions & 9 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@ import (
)

const (
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
reapEntryInterval = 30 * time.Minute
reapNetworkInterval = reapEntryInterval + 5*reapPeriod
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
)

type logWriter struct{}
Expand Down
13 changes: 7 additions & 6 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
nDB.nodes[n.Name] = n
nDB.Unlock()
if !found {
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
}
return true
case NodeEventTypeLeave:
nDB.Lock()
nDB.leftNodes[n.Name] = n
nDB.Unlock()
logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
return true
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.ltime = nEvent.LTime
n.leaving = nEvent.Type == NetworkEventTypeLeave
if n.leaving {
n.reapTime = reapNetworkInterval
n.reapTime = nDB.config.reapNetworkInterval

// The remote node is leaving the network, but not the gossip cluster.
// Mark all its entries in deleted state, this will guarantee that
Expand Down Expand Up @@ -216,8 +216,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
// field. If that is not the case, this can be a BUG
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapEntryInterval
logrus.Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?",
nDB.config.Hostname, nDB.config.NodeID, tEvent)
e.reapTime = nDB.config.reapEntryInterval
}

nDB.Lock()
Expand All @@ -229,7 +230,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
return e.reapTime > reapEntryInterval/6
return e.reapTime > nDB.config.reapEntryInterval/6
}

var op opType
Expand Down
21 changes: 17 additions & 4 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ type Config struct {
// be able to increase this to get more content into each gossip packet.
PacketBufferSize int

// reapEntryInterval duration of a deleted entry before being garbage collected
reapEntryInterval time.Duration

// reapNetworkInterval duration of a delted network before being garbage collected
// NOTE this MUST always be higher than reapEntryInterval
reapNetworkInterval time.Duration

// StatsPrintPeriod the period to use to print queue stats
// Default is 5min
StatsPrintPeriod time.Duration
Expand Down Expand Up @@ -220,12 +227,18 @@ func DefaultConfig() *Config {
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Minute,
HealthPrintPeriod: 1 * time.Minute,
reapEntryInterval: 30 * time.Minute,
}
}

// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod

nDB := &NetworkDB{
config: c,
indexes: make(map[int]*radix.Tree),
Expand All @@ -241,7 +254,7 @@ func New(c *Config) (*NetworkDB, error) {
nDB.indexes[byTable] = radix.New()
nDB.indexes[byNetwork] = radix.New()

logrus.Debugf("New memberlist node - Node:%v will use memberlist nodeID:%v", c.Hostname, c.NodeID)
logrus.Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
if err := nDB.clusterInit(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -414,7 +427,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
node: nDB.config.NodeID,
value: value,
deleting: true,
reapTime: reapEntryInterval,
reapTime: nDB.config.reapEntryInterval,
}

if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
Expand Down Expand Up @@ -487,7 +500,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
node: oldEntry.node,
value: oldEntry.value,
deleting: true,
reapTime: reapEntryInterval,
reapTime: nDB.config.reapEntryInterval,
}

// we arrived at this point in 2 cases:
Expand Down Expand Up @@ -632,7 +645,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {

logrus.Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
n.ltime = ltime
n.reapTime = reapNetworkInterval
n.reapTime = nDB.config.reapNetworkInterval
n.leaving = true
return nil
}
Expand Down
90 changes: 72 additions & 18 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand All @@ -27,13 +28,14 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
var dbs []*NetworkDB
for i := 0; i < num; i++ {
conf := DefaultConfig()
conf.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
conf.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(conf)
localConfig := *conf
localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(&localConfig)
require.NoError(t, err)

if i != 0 {
Expand All @@ -44,10 +46,19 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*Netwo
dbs = append(dbs, db)
}

// Check that the cluster is properly created
for i := 0; i < num; i++ {
if num != len(dbs[i].ClusterPeers()) {
t.Fatalf("Number of nodes for %s into the cluster does not match %d != %d",
dbs[i].config.Hostname, num, len(dbs[i].ClusterPeers()))
}
}

return dbs
}

func closeNetworkDBInstances(dbs []*NetworkDB) {
log.Print("Closing DB instances...")
for _, db := range dbs {
db.Close()
}
Expand Down Expand Up @@ -147,12 +158,12 @@ func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, k
}

func TestNetworkDBSimple(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
closeNetworkDBInstances(dbs)
}

func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand All @@ -167,7 +178,7 @@ func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
}

func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

n := 10
for i := 1; i <= n; i++ {
Expand Down Expand Up @@ -210,7 +221,7 @@ func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
}

func TestNetworkDBCRUDTableEntry(t *testing.T) {
dbs := createNetworkDBInstances(t, 3, "node")
dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand Down Expand Up @@ -240,7 +251,7 @@ func TestNetworkDBCRUDTableEntry(t *testing.T) {
}

func TestNetworkDBCRUDTableEntries(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand Down Expand Up @@ -308,7 +319,7 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) {
}

func TestNetworkDBNodeLeave(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand All @@ -327,7 +338,7 @@ func TestNetworkDBNodeLeave(t *testing.T) {
}

func TestNetworkDBWatch(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)

Expand Down Expand Up @@ -356,7 +367,7 @@ func TestNetworkDBWatch(t *testing.T) {
}

func TestNetworkDBBulkSync(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
Expand Down Expand Up @@ -389,7 +400,7 @@ func TestNetworkDBBulkSync(t *testing.T) {
func TestNetworkDBCRUDMediumCluster(t *testing.T) {
n := 5

dbs := createNetworkDBInstances(t, n, "node")
dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())

for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
Expand Down Expand Up @@ -433,13 +444,12 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) {
dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
}

log.Print("Closing DB instances...")
closeNetworkDBInstances(dbs)
}

func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
maxRetry := 5
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())

// Single node Join/Leave
err := dbs[0].JoinNetwork("network1")
Expand Down Expand Up @@ -517,6 +527,50 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
}

dbs[0].Close()
dbs[1].Close()
closeNetworkDBInstances(dbs)
}

func TestNetworkDBGarbageCollection(t *testing.T) {
keysWriteDelete := 5
config := DefaultConfig()
config.reapEntryInterval = time.Minute
config.StatsPrintPeriod = 30 * time.Second

dbs := createNetworkDBInstances(t, 3, "node", config)

// 2 Nodes join network
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)

err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err)

for i := 0; i < keysWriteDelete; i++ {
err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+string(i), []byte("value"))
assert.NoError(t, err)
}
time.Sleep(time.Second)
for i := 0; i < keysWriteDelete; i++ {
err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+string(i))
assert.NoError(t, err)
}
for i := 0; i < 2; i++ {
assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
}

// from this point the timer for the garbage collection started, wait 1 minute and then join a new node
time.Sleep(30 * time.Second)

err = dbs[2].JoinNetwork("network1")
assert.NoError(t, err)
for i := 0; i < 3; i++ {
assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
}
// at this point the entries should had been all deleted
time.Sleep(40 * time.Second)
for i := 0; i < 3; i++ {
assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
}

closeNetworkDBInstances(dbs)
}

0 comments on commit 540b97f

Please sign in to comment.