Skip to content

Commit 13064e4

Browse files
anrsanrs
anrs
authored andcommitted
refactor: wraps grant and keepalive.
1 parent 14b8e7c commit 13064e4

File tree

5 files changed

+250
-95
lines changed

5 files changed

+250
-95
lines changed

store/etcdv3/meta/etcd.go

+56-10
Original file line numberDiff line numberDiff line change
@@ -228,19 +228,65 @@ func (e *ETCD) BatchUpdate(ctx context.Context, data map[string]string, opts ...
228228
return e.batchUpdate(ctx, data, opts...)
229229
}
230230

231-
// Grant creates a new lease.
232-
func (e *ETCD) Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) {
233-
return e.cliv3.Grant(ctx, ttl)
234-
}
235-
236231
// KeepAliveOnce keeps on a lease alive.
237-
func (e *ETCD) KeepAliveOnce(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) {
238-
return e.cliv3.KeepAliveOnce(ctx, id)
232+
func (e *ETCD) BindStatus(ctx context.Context, entityKey, statusKey, statusValue string, ttl int64) error {
233+
updateStatus := []clientv3.Op{clientv3.OpPut(statusKey, statusValue)}
234+
if ttl != 0 {
235+
lease, err := e.Grant(ctx, ttl)
236+
if err != nil {
237+
return err
238+
}
239+
updateStatus = []clientv3.Op{clientv3.OpPut(statusKey, statusValue, clientv3.WithLease(lease.ID))}
240+
}
241+
242+
entityTxn, err := e.cliv3.Txn(ctx).
243+
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
244+
Then( // making sure there's an exists entity kv-pair.
245+
clientv3.OpTxn(
246+
[]clientv3.Cmp{clientv3.Compare(clientv3.Version(statusKey), "!=", 0)}, // Is the status exists?
247+
[]clientv3.Op{clientv3.OpTxn( // there's an exists status
248+
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "=", statusValue)},
249+
[]clientv3.Op{clientv3.OpGet(statusKey)}, // The status hasn't been changed.
250+
updateStatus, // The status had been changed.
251+
)},
252+
updateStatus, // there isn't a status
253+
),
254+
).Commit()
255+
if err != nil {
256+
return err
257+
}
258+
259+
// There isn't the entity kv pair.
260+
if !entityTxn.Succeeded {
261+
return nil
262+
}
263+
264+
// There isn't a status bound to the entity.
265+
statusTxn := entityTxn.Responses[0].GetResponseTxn()
266+
if !statusTxn.Succeeded {
267+
return nil
268+
}
269+
270+
// A zero TTL means it doesn't affect anything
271+
if ttl == 0 {
272+
return nil
273+
}
274+
275+
// There is a status bound to the entity yet but its value isn't same as the expected one.
276+
valueTxn := statusTxn.Responses[0].GetResponseTxn()
277+
if !valueTxn.Succeeded {
278+
return nil
279+
}
280+
281+
// Gets the lease ID which binds onto the status, and renew it one round.
282+
origLeaseID := clientv3.LeaseID(valueTxn.Responses[0].GetResponseRange().Kvs[0].Lease)
283+
_, err = e.cliv3.KeepAliveOnce(ctx, origLeaseID)
284+
return err
239285
}
240286

241-
// Txn creates a new Txn
242-
func (e *ETCD) Txn(ctx context.Context) clientv3.Txn {
243-
return e.cliv3.Txn(ctx)
287+
// Grant creates a new lease.
288+
func (e *ETCD) Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) {
289+
return e.cliv3.Grant(ctx, ttl)
244290
}
245291

246292
func (e *ETCD) batchUpdate(ctx context.Context, data map[string]string, opts ...clientv3.OpOption) (*clientv3.TxnResponse, error) {

store/etcdv3/meta/etcd_test.go

+182-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"github.com/stretchr/testify/mock"
99
"github.com/stretchr/testify/require"
1010
"go.etcd.io/etcd/v3/clientv3"
11+
"go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
12+
"go.etcd.io/etcd/v3/mvcc/mvccpb"
1113

1214
"github.com/projecteru2/core/store/etcdv3/meta/mocks"
1315
"github.com/projecteru2/core/types"
@@ -61,20 +63,190 @@ func TestGrant(t *testing.T) {
6163
require.Nil(t, resp)
6264
}
6365

64-
func TestKeepAliveOnce(t *testing.T) {
65-
e := NewMockedETCD(t)
66+
func TestBindStatusFailedAsGrantError(t *testing.T) {
67+
e, etcd, assert := testKeepAliveETCD(t)
68+
defer assert()
6669
expErr := fmt.Errorf("exp")
67-
e.cliv3.(*mocks.ETCDClientV3).On("KeepAliveOnce", mock.Anything, mock.Anything).Return(nil, expErr)
68-
resp, err := e.KeepAliveOnce(context.Background(), 1)
69-
require.Equal(t, expErr, err)
70-
require.Nil(t, resp)
70+
etcd.On("Grant", mock.Anything, mock.Anything).Return(nil, expErr).Once()
71+
require.Equal(t, expErr, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
72+
}
73+
74+
func TestBindStatusFailedAsCommitError(t *testing.T) {
75+
e, etcd, assert := testKeepAliveETCD(t)
76+
defer assert()
77+
78+
expErr := fmt.Errorf("exp")
79+
txn := &mocks.Txn{}
80+
defer txn.AssertExpectations(t)
81+
txn.On("If", mock.Anything).Return(txn).Once()
82+
txn.On("Then", mock.Anything).Return(txn).Once()
83+
txn.On("Commit").Return(nil, expErr).Once()
84+
85+
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
86+
etcd.On("Txn", mock.Anything).Return(txn).Once()
87+
require.Equal(t, expErr, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
7188
}
7289

73-
func TestTxn(t *testing.T) {
90+
func TestBindStatusButEntityTxnUnsuccessful(t *testing.T) {
91+
e, etcd, assert := testKeepAliveETCD(t)
92+
defer assert()
93+
94+
entityTxn := &clientv3.TxnResponse{Succeeded: false}
95+
txn := &mocks.Txn{}
96+
defer txn.AssertExpectations(t)
97+
txn.On("If", mock.Anything).Return(txn).Once()
98+
txn.On("Then", mock.Anything).Return(txn).Once()
99+
txn.On("Commit").Return(entityTxn, nil)
100+
101+
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
102+
etcd.On("Txn", mock.Anything).Return(txn).Once()
103+
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
104+
}
105+
106+
func TestBindStatusButStatusTxnUnsuccessful(t *testing.T) {
107+
e, etcd, assert := testKeepAliveETCD(t)
108+
defer assert()
109+
110+
entityTxn := &clientv3.TxnResponse{
111+
Succeeded: true,
112+
Responses: []*etcdserverpb.ResponseOp{
113+
&etcdserverpb.ResponseOp{
114+
Response: &etcdserverpb.ResponseOp_ResponseTxn{
115+
// statusTxn
116+
ResponseTxn: &etcdserverpb.TxnResponse{Succeeded: false},
117+
},
118+
},
119+
},
120+
}
121+
txn := &mocks.Txn{}
122+
defer txn.AssertExpectations(t)
123+
txn.On("If", mock.Anything).Return(txn).Once()
124+
txn.On("Then", mock.Anything).Return(txn).Once()
125+
txn.On("Commit").Return(entityTxn, nil)
126+
127+
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
128+
etcd.On("Txn", mock.Anything).Return(txn).Once()
129+
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
130+
}
131+
132+
func TestBindStatusWithZeroTTL(t *testing.T) {
133+
e, etcd, assert := testKeepAliveETCD(t)
134+
defer assert()
135+
136+
entityTxn := &clientv3.TxnResponse{
137+
Succeeded: true,
138+
Responses: []*etcdserverpb.ResponseOp{
139+
&etcdserverpb.ResponseOp{
140+
Response: &etcdserverpb.ResponseOp_ResponseTxn{
141+
// statusTxn
142+
ResponseTxn: &etcdserverpb.TxnResponse{Succeeded: true},
143+
},
144+
},
145+
},
146+
}
147+
txn := &mocks.Txn{}
148+
defer txn.AssertExpectations(t)
149+
txn.On("If", mock.Anything).Return(txn).Once()
150+
txn.On("Then", mock.Anything).Return(txn).Once()
151+
txn.On("Commit").Return(entityTxn, nil)
152+
153+
etcd.On("Txn", mock.Anything).Return(txn).Once()
154+
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 0))
155+
}
156+
157+
func TestBindStatusButValueTxnUnsuccessful(t *testing.T) {
158+
e, etcd, assert := testKeepAliveETCD(t)
159+
defer assert()
160+
161+
statusTxn := &etcdserverpb.TxnResponse{
162+
Succeeded: true,
163+
Responses: []*etcdserverpb.ResponseOp{
164+
&etcdserverpb.ResponseOp{
165+
Response: &etcdserverpb.ResponseOp_ResponseTxn{
166+
// valueTxn
167+
ResponseTxn: &etcdserverpb.TxnResponse{Succeeded: false},
168+
},
169+
},
170+
},
171+
}
172+
entityTxn := &clientv3.TxnResponse{
173+
Succeeded: true,
174+
Responses: []*etcdserverpb.ResponseOp{
175+
&etcdserverpb.ResponseOp{
176+
Response: &etcdserverpb.ResponseOp_ResponseTxn{
177+
// statusTxn
178+
ResponseTxn: statusTxn,
179+
},
180+
},
181+
},
182+
}
183+
txn := &mocks.Txn{}
184+
defer txn.AssertExpectations(t)
185+
txn.On("If", mock.Anything).Return(txn).Once()
186+
txn.On("Then", mock.Anything).Return(txn).Once()
187+
txn.On("Commit").Return(entityTxn, nil)
188+
189+
etcd.On("Txn", mock.Anything).Return(txn).Once()
190+
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 0))
191+
}
192+
193+
func TestBindStatus(t *testing.T) {
194+
e, etcd, assert := testKeepAliveETCD(t)
195+
defer assert()
196+
197+
leaseID := int64(1235)
198+
valueTxn := &etcdserverpb.TxnResponse{
199+
Succeeded: true,
200+
Responses: []*etcdserverpb.ResponseOp{
201+
&etcdserverpb.ResponseOp{
202+
Response: &etcdserverpb.ResponseOp_ResponseRange{
203+
ResponseRange: &etcdserverpb.RangeResponse{
204+
Kvs: []*mvccpb.KeyValue{
205+
&mvccpb.KeyValue{Lease: leaseID},
206+
},
207+
},
208+
},
209+
},
210+
},
211+
}
212+
statusTxn := &etcdserverpb.TxnResponse{
213+
Succeeded: true,
214+
Responses: []*etcdserverpb.ResponseOp{
215+
&etcdserverpb.ResponseOp{
216+
Response: &etcdserverpb.ResponseOp_ResponseTxn{
217+
ResponseTxn: valueTxn,
218+
},
219+
},
220+
},
221+
}
222+
entityTxn := &clientv3.TxnResponse{
223+
Succeeded: true,
224+
Responses: []*etcdserverpb.ResponseOp{
225+
&etcdserverpb.ResponseOp{
226+
Response: &etcdserverpb.ResponseOp_ResponseTxn{
227+
// statusTxn
228+
ResponseTxn: statusTxn,
229+
},
230+
},
231+
},
232+
}
233+
txn := &mocks.Txn{}
234+
defer txn.AssertExpectations(t)
235+
txn.On("If", mock.Anything).Return(txn).Once()
236+
txn.On("Then", mock.Anything).Return(txn).Once()
237+
txn.On("Commit").Return(entityTxn, nil)
238+
239+
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
240+
etcd.On("Txn", mock.Anything).Return(txn).Once()
241+
etcd.On("KeepAliveOnce", mock.Anything, clientv3.LeaseID(leaseID)).Return(nil, nil).Once()
242+
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
243+
}
244+
245+
func testKeepAliveETCD(t *testing.T) (*ETCD, *mocks.ETCDClientV3, func()) {
74246
e := NewMockedETCD(t)
75-
expTxn := &mocks.Txn{}
76-
e.cliv3.(*mocks.ETCDClientV3).On("Txn", mock.Anything).Return(expTxn)
77-
require.Equal(t, expTxn, e.Txn(context.Background()))
247+
etcd, ok := e.cliv3.(*mocks.ETCDClientV3)
248+
require.True(t, ok)
249+
return e, etcd, func() { etcd.AssertExpectations(t) }
78250
}
79251

80252
func NewMockedETCD(t *testing.T) *ETCD {

store/etcdv3/meta/meta.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ import (
1313
// KV .
1414
type KV interface {
1515
Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error)
16-
KeepAliveOnce(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error)
17-
Txn(context.Context) clientv3.Txn
16+
BindStatus(ctx context.Context, entityKey, statusKey, statusValue string, ttl int64) error
1817

1918
Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
2019
GetOne(ctx context.Context, key string, opts ...clientv3.OpOption) (*mvccpb.KeyValue, error)

store/etcdv3/meta/mocks/KV.go

+8-33
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)