-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathdraft_test.go
120 lines (105 loc) · 3.76 KB
/
draft_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package worker
import (
"io/ioutil"
"os"
"testing"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/raftpb"
)
func getEntryForMutation(index, startTs uint64) raftpb.Entry {
proposal := pb.Proposal{Mutations: &pb.Mutations{StartTs: startTs}}
data := make([]byte, 8+proposal.Size())
sz, err := proposal.MarshalToSizedBuffer(data)
x.Check(err)
data = data[:8+sz]
return raftpb.Entry{Index: index, Term: 1, Type: raftpb.EntryNormal, Data: data}
}
func getEntryForCommit(index, startTs, commitTs uint64) raftpb.Entry {
delta := &pb.OracleDelta{}
delta.Txns = append(delta.Txns, &pb.TxnStatus{StartTs: startTs, CommitTs: commitTs})
proposal := pb.Proposal{Delta: delta}
data := make([]byte, 8+proposal.Size())
sz, err := proposal.MarshalToSizedBuffer(data)
x.Check(err)
data = data[:8+sz]
return raftpb.Entry{Index: index, Term: 1, Type: raftpb.EntryNormal, Data: data}
}
func TestCalculateSnapshot(t *testing.T) {
dir, err := ioutil.TempDir("", "raftwal")
require.NoError(t, err)
defer os.RemoveAll(dir)
ds := raftwal.Init(dir)
defer ds.Close()
n := newNode(ds, 1, 1, "")
var entries []raftpb.Entry
// Txn: 1 -> 5 // 5 should be the ReadTs.
// Txn: 2 // Should correspond to the index. Subtract 1 from the index.
// Txn: 3 -> 4
entries = append(entries, getEntryForMutation(1, 1), getEntryForMutation(2, 3),
getEntryForMutation(3, 2), getEntryForCommit(4, 3, 4), getEntryForCommit(5, 1, 5))
require.NoError(t, n.Store.Save(&raftpb.HardState{}, entries, &raftpb.Snapshot{}))
n.Applied.SetDoneUntil(5)
posting.Oracle().RegisterStartTs(2)
snap, err := n.calculateSnapshot(0, 1)
require.NoError(t, err)
require.Equal(t, uint64(5), snap.ReadTs)
require.Equal(t, uint64(1), snap.Index)
// Check state of Raft store.
var cs raftpb.ConfState
err = n.Store.CreateSnapshot(snap.Index, &cs, nil)
require.NoError(t, err)
first, err := n.Store.FirstIndex()
require.NoError(t, err)
require.Equal(t, uint64(2), first)
last, err := n.Store.LastIndex()
require.NoError(t, err)
require.Equal(t, uint64(5), last)
// This time commit all txns.
// Txn: 7 -> 8
// Txn: 2 -> 9
entries = entries[:0]
entries = append(entries, getEntryForMutation(6, 7), getEntryForCommit(7, 7, 8),
getEntryForCommit(8, 2, 9))
require.NoError(t, n.Store.Save(&raftpb.HardState{}, entries, &raftpb.Snapshot{}))
n.Applied.SetDoneUntil(8)
posting.Oracle().ResetTxns()
snap, err = n.calculateSnapshot(0, 1)
require.NoError(t, err)
require.Equal(t, uint64(9), snap.ReadTs)
require.Equal(t, uint64(8), snap.Index)
// Check state of Raft store.
err = n.Store.CreateSnapshot(snap.Index, &cs, nil)
require.NoError(t, err)
first, err = n.Store.FirstIndex()
require.NoError(t, err)
require.Equal(t, uint64(9), first)
entries = entries[:0]
entries = append(entries, getEntryForMutation(9, 11))
require.NoError(t, n.Store.Save(&raftpb.HardState{}, entries, &raftpb.Snapshot{}))
n.Applied.SetDoneUntil(9)
snap, err = n.calculateSnapshot(0, 0)
require.NoError(t, err)
require.Nil(t, snap)
}
func TestRaftMetrics(t *testing.T) {
// TODO
}