Skip to content

Commit

Permalink
test: Add DB/Node Restart tests (#1504)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #1082 

## Description

Adds DB/Node Restart tests. This will be required by Lens (e.g. to make
sure migrations are not lost on restart). IIRC I think this also found
#1482 (fixed by Fred).

---------

Co-authored-by: Fred Carle <fredcarle@users.noreply.github.com>
  • Loading branch information
AndrewSisley and fredcarle authored May 15, 2023
1 parent db4b853 commit 84f6c8f
Show file tree
Hide file tree
Showing 13 changed files with 461 additions and 41 deletions.
8 changes: 8 additions & 0 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ func (p *Peer) Start() error {
defer p.mu.Unlock()

// reconnect to known peers
var wg sync.WaitGroup
for _, id := range p.host.Peerstore().PeersWithAddrs() {
if id == p.host.ID() {
continue
}
wg.Add(1)
go func(id peer.ID) {
defer wg.Done()
addr := p.host.Peerstore().PeerInfo(id)
err := p.host.Connect(p.ctx, addr)
if err != nil {
Expand All @@ -155,6 +158,7 @@ func (p *Peer) Start() error {
}
}(id)
}
wg.Wait()

p2plistener, err := gostream.Listen(p.host, corenet.Protocol)
if err != nil {
Expand Down Expand Up @@ -227,6 +231,10 @@ func (p *Peer) Close() error {
log.ErrorE(p.ctx, "Error closing block service", err)
}

if err := p.host.Close(); err != nil {
log.ErrorE(p.ctx, "Error closing host", err)
}

p.cancel()
return nil
}
Expand Down
16 changes: 15 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ type Node struct {
// receives an event when a pushLog request has been processed.
pushLogEvent chan net.EvtReceivedPushLog

ctx context.Context
ctx context.Context
cancel context.CancelFunc
}

// NewNode creates a new network node instance of DefraDB, wired into libp2p.
Expand Down Expand Up @@ -155,6 +156,8 @@ func NewNode(
}
}

ctx, cancel := context.WithCancel(ctx)

peer, err := net.NewPeer(
ctx,
db,
Expand All @@ -166,6 +169,7 @@ func NewNode(
options.GRPCDialOptions,
)
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}

Expand All @@ -185,6 +189,7 @@ func NewNode(
pubsub: ps,
DB: db,
ctx: ctx,
cancel: cancel,
}

n.subscribeToPeerConnectionEvents()
Expand Down Expand Up @@ -313,6 +318,8 @@ func (n *Node) WaitForPeerConnectionEvent(id peer.ID) error {
return nil
case <-time.After(evtWaitTimeout):
return errors.New("waiting for peer connection timed out")
case <-n.ctx.Done():
return nil
}
}
}
Expand All @@ -328,6 +335,8 @@ func (n *Node) WaitForPubSubEvent(id peer.ID) error {
return nil
case <-time.After(evtWaitTimeout):
return errors.New("waiting for pubsub timed out")
case <-n.ctx.Done():
return nil
}
}
}
Expand All @@ -349,6 +358,8 @@ func (n *Node) WaitForPushLogByPeerEvent(id peer.ID) error {
return nil
case <-time.After(evtWaitTimeout):
return errors.New("waiting for pushlog timed out")
case <-n.ctx.Done():
return nil
}
}
}
Expand All @@ -370,6 +381,8 @@ func (n *Node) WaitForPushLogFromPeerEvent(id peer.ID) error {
return nil
case <-time.After(evtWaitTimeout):
return errors.New("waiting for pushlog timed out")
case <-n.ctx.Done():
return nil
}
}
}
Expand Down Expand Up @@ -430,5 +443,6 @@ func newDHT(ctx context.Context, h host.Host, dsb ds.Batching) (*dualdht.DHT, er

// Close closes the node and all its services.
func (n Node) Close() error {
n.cancel()
return n.Peer.Close()
}
2 changes: 1 addition & 1 deletion tests/bench/bench_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func newBenchStoreInfo(ctx context.Context, t testing.TB) (client.DB, error) {
case "memory":
db, err = testutils.NewBadgerMemoryDB(ctx)
case "badger":
db, err = testutils.NewBadgerFileDB(ctx, t)
db, _, err = testutils.NewBadgerFileDB(ctx, t)
default:
return nil, errors.New(fmt.Sprintf("invalid storage engine backend: %s", storage))
}
Expand Down
14 changes: 14 additions & 0 deletions tests/integration/change_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)

var skip bool
Expand Down Expand Up @@ -58,6 +60,18 @@ func DetectDbChangesPreTestChecks(
t.SkipNow()
}

if !SetupOnly {
dbDirectory := path.Join(rootDatabaseDir, t.Name())
_, err := os.Stat(dbDirectory)
if os.IsNotExist(err) {
// This is a new test that does not exist in the target branch, we should
// skip it.
t.SkipNow()
} else {
require.NoError(t, err)
}
}

return false
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/explain/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func getDatabases(ctx context.Context, t *testing.T) ([]databaseInfo, error) {
databases := []databaseInfo{}

for _, dbt := range testUtils.GetDatabaseTypes() {
db, err := testUtils.GetDatabase(ctx, t, dbt)
db, _, err := testUtils.GetDatabase(ctx, t, dbt)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/net/order/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,10 @@ func executeTestCase(t *testing.T, test P2PTestCase) {

// clean up
for _, n := range nodes {
n.DB.Close(ctx)
if err := n.Close(); err != nil {
log.Info(ctx, "node not closing as expected", logging.NewKV("Error", err.Error()))
}
n.DB.Close(ctx)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package peer_test

import (
"testing"

"github.com/sourcenetwork/immutable"

testUtils "github.com/sourcenetwork/defradb/tests/integration"
)

func TestP2PWithSingleDocumentSingleUpdateFromChildAndRestart(t *testing.T) {
test := testUtils.TestCase{
Actions: []any{
testUtils.RandomNetworkingConfig(),
testUtils.RandomNetworkingConfig(),
testUtils.SchemaUpdate{
Schema: `
type Users {
Name: String
Age: Int
}
`,
},
testUtils.CreateDoc{
// Create John on all nodes
Doc: `{
"Name": "John",
"Age": 21
}`,
},
testUtils.ConnectPeers{
SourceNodeID: 0,
TargetNodeID: 1,
},
testUtils.Restart{},
testUtils.UpdateDoc{
// Update John's Age on the first node only, and allow the value to sync
NodeID: immutable.Some(0),
Doc: `{
"Age": 60
}`,
},
testUtils.WaitForSync{},
testUtils.Request{
Request: `query {
Users {
Age
}
}`,
Results: []map[string]any{
{
"Age": uint64(60),
},
},
},
},
}

testUtils.ExecuteTestCase(t, []string{"Users"}, test)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package peer_replicator_test

import (
"testing"

"github.com/sourcenetwork/immutable"

testUtils "github.com/sourcenetwork/defradb/tests/integration"
)

func TestP2PPeerReplicatorWithUpdateAndRestart(t *testing.T) {
test := testUtils.TestCase{
Actions: []any{
testUtils.RandomNetworkingConfig(),
testUtils.RandomNetworkingConfig(),
testUtils.RandomNetworkingConfig(),
testUtils.SchemaUpdate{
Schema: `
type Users {
Name: String
Age: Int
}
`,
},
testUtils.CreateDoc{
Doc: `{
"Name": "John",
"Age": 21
}`,
},
testUtils.ConnectPeers{
SourceNodeID: 1,
TargetNodeID: 0,
},
testUtils.ConfigureReplicator{
SourceNodeID: 0,
TargetNodeID: 2,
},
// We need to wait and ensure that the create events are handled before
// restarting the nodes as otherwise there is no gaurentee which side of
// the restart that the create events are handled, resulting in flaky tests
testUtils.WaitForSync{},
testUtils.Restart{},
testUtils.UpdateDoc{
// Update John's Age on the first node only, and allow the value to sync
NodeID: immutable.Some(0),
Doc: `{
"Age": 60
}`,
},
testUtils.WaitForSync{},
testUtils.Request{
Request: `query {
Users {
Age
}
}`,
Results: []map[string]any{
{
"Age": uint64(60),
},
},
},
},
}

testUtils.ExecuteTestCase(t, []string{"Users"}, test)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package replicator

import (
"testing"

"github.com/sourcenetwork/immutable"

testUtils "github.com/sourcenetwork/defradb/tests/integration"
)

func TestP2POneToOneReplicatorWithRestart(t *testing.T) {
test := testUtils.TestCase{
Actions: []any{
testUtils.RandomNetworkingConfig(),
testUtils.RandomNetworkingConfig(),
testUtils.SchemaUpdate{
Schema: `
type Users {
Name: String
Age: Int
}
`,
},
testUtils.ConfigureReplicator{
SourceNodeID: 0,
TargetNodeID: 1,
},
testUtils.Restart{},
testUtils.CreateDoc{
// Create John on the first (source) node only, and allow the value to sync
NodeID: immutable.Some(0),
Doc: `{
"Name": "John",
"Age": 21
}`,
},
testUtils.WaitForSync{},
testUtils.Request{
Request: `query {
Users {
Age
}
}`,
Results: []map[string]any{
{
"Age": uint64(21),
},
},
},
},
}

testUtils.ExecuteTestCase(t, []string{"Users"}, test)
}
Loading

0 comments on commit 84f6c8f

Please sign in to comment.