Skip to content

Commit 33c735d

Browse files
zcCMGS
zc
authored andcommitted
test and fix transaction during creating (#215)
1 parent 64d3c5d commit 33c735d

File tree

5 files changed

+311
-10
lines changed

5 files changed

+311
-10
lines changed

cluster/calcium/create_test.go

+270
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/pkg/errors"
8+
enginemocks "github.com/projecteru2/core/engine/mocks"
9+
enginetypes "github.com/projecteru2/core/engine/types"
10+
lockmocks "github.com/projecteru2/core/lock/mocks"
11+
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
12+
st "github.com/projecteru2/core/store"
713
storemocks "github.com/projecteru2/core/store/mocks"
814
"github.com/projecteru2/core/types"
915
"github.com/stretchr/testify/assert"
@@ -38,3 +44,267 @@ func TestCreateContainer(t *testing.T) {
3844
_, err = c.CreateContainer(ctx, opts)
3945
assert.Error(t, err)
4046
}
47+
48+
func TestCreateContainerTxn(t *testing.T) {
49+
c := NewTestCluster()
50+
ctx := context.Background()
51+
opts := &types.DeployOptions{
52+
Count: 2,
53+
DeployMethod: "auto",
54+
CPUQuota: 1,
55+
Image: "zc:test",
56+
Entrypoint: &types.Entrypoint{},
57+
}
58+
store := &storemocks.Store{}
59+
scheduler := &schedulermocks.Scheduler{}
60+
c.store = store
61+
c.scheduler = scheduler
62+
engine := &enginemocks.API{}
63+
64+
pod1 := &types.Pod{Name: "p1"}
65+
node1 := &types.Node{
66+
Name: "n1",
67+
Engine: engine,
68+
}
69+
node2 := &types.Node{
70+
Name: "n2",
71+
Engine: engine,
72+
}
73+
nodes := []*types.Node{node1, node2}
74+
75+
// GetPod fails
76+
store.On("GetPod", mock.Anything, mock.Anything).Return(
77+
nil,
78+
errors.Wrap(context.DeadlineExceeded, "GetPod"),
79+
).Once()
80+
_, err := c.CreateContainer(ctx, opts)
81+
assert.Error(t, err)
82+
assert.True(t, errors.Is(err, context.DeadlineExceeded))
83+
assert.Error(t, err, "GetPod")
84+
85+
// doAllocResource fails: MakeDeployStatus
86+
lock := &lockmocks.DistributedLock{}
87+
lock.On("Lock", mock.Anything).Return(nil)
88+
lock.On("Unlock", mock.Anything).Return(nil)
89+
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
90+
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
91+
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
92+
store.On("GetNode",
93+
mock.AnythingOfType("*context.emptyCtx"),
94+
mock.AnythingOfType("string"),
95+
).Return(
96+
func(_ context.Context, name string) (node *types.Node) {
97+
node = node1
98+
if name == "n2" {
99+
node = node2
100+
}
101+
return
102+
}, nil)
103+
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(
104+
nil,
105+
errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"),
106+
).Once()
107+
_, err = c.CreateContainer(ctx, opts)
108+
assert.Error(t, err)
109+
assert.True(t, errors.Is(err, context.DeadlineExceeded))
110+
assert.Error(t, err, "MakeDeployStatus")
111+
112+
// doAllocResource fails: UpdateNodeResource for 1st node
113+
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
114+
store.On("MakeDeployStatus", ctx, opts, mock.AnythingOfType("[]types.NodeInfo")).Return(
115+
func(_ context.Context, _ *types.DeployOptions, nodesInfo []types.NodeInfo) []types.NodeInfo {
116+
return nodesInfo
117+
}, nil)
118+
scheduler.On("SelectMemoryNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
119+
func(nodesInfo []types.NodeInfo, _ float64, _ int64) []types.NodeInfo {
120+
return nodesInfo
121+
}, len(nodes), nil)
122+
scheduler.On("SelectStorageNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("int64")).Return(
123+
func(nodesInfo []types.NodeInfo, _ int64) []types.NodeInfo {
124+
return nodesInfo
125+
},
126+
len(nodes), nil,
127+
)
128+
scheduler.On("SelectVolumeNodes", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(
129+
func(nodesInfo []types.NodeInfo, _ types.VolumeBindings) []types.NodeInfo {
130+
return nodesInfo
131+
},
132+
nil, len(nodes), nil,
133+
)
134+
scheduler.On("CommonDivision", mock.AnythingOfType("[]types.NodeInfo"), mock.AnythingOfType("int"), mock.AnythingOfType("int"), mock.AnythingOfType("types.ResourceType")).Return(
135+
func(nodesInfo []types.NodeInfo, _, _ int, _ types.ResourceType) []types.NodeInfo {
136+
for i := range nodesInfo {
137+
nodesInfo[i].Deploy = 1
138+
}
139+
return nodesInfo
140+
},
141+
nil,
142+
)
143+
store.On("UpdateNodeResource",
144+
mock.AnythingOfType("*context.timerCtx"),
145+
mock.AnythingOfType("*types.Node"),
146+
mock.AnythingOfType("types.ResourceMap"),
147+
mock.AnythingOfType("float64"),
148+
mock.AnythingOfType("int64"),
149+
mock.AnythingOfType("int64"),
150+
mock.AnythingOfType("types.ResourceMap"),
151+
mock.AnythingOfType("string")).Return(
152+
func(ctx context.Context, node *types.Node, _ types.CPUMap, quota float64, _, _ int64, _ types.VolumeMap, action string) error {
153+
if action == st.ActionDecr {
154+
return errors.Wrap(context.DeadlineExceeded, "UpdateNodeResource")
155+
}
156+
if action == st.ActionIncr {
157+
quota = -quota
158+
}
159+
node.CPUUsed += quota
160+
return nil
161+
},
162+
).Once()
163+
_, err = c.CreateContainer(ctx, opts)
164+
assert.Error(t, err)
165+
assert.True(t, errors.Is(err, context.DeadlineExceeded))
166+
assert.Error(t, err, "UpdateNodeResource")
167+
assert.EqualValues(t, 0, node1.CPUUsed)
168+
assert.EqualValues(t, 0, node2.CPUUsed)
169+
170+
// doAllocResource fails: UpdateNodeResource for 2nd node
171+
cnt := 0
172+
store.On("UpdateNodeResource",
173+
mock.AnythingOfType("*context.timerCtx"),
174+
mock.AnythingOfType("*types.Node"),
175+
mock.AnythingOfType("types.ResourceMap"),
176+
mock.AnythingOfType("float64"),
177+
mock.AnythingOfType("int64"),
178+
mock.AnythingOfType("int64"),
179+
mock.AnythingOfType("types.ResourceMap"),
180+
mock.AnythingOfType("string")).Return(
181+
func(ctx context.Context, node *types.Node, _ types.CPUMap, quota float64, _, _ int64, _ types.VolumeMap, action string) error {
182+
if action == st.ActionDecr {
183+
cnt++
184+
if cnt == 2 {
185+
return errors.Wrap(context.DeadlineExceeded, "UpdateNodeResource2")
186+
}
187+
}
188+
if action == st.ActionIncr {
189+
quota = -quota
190+
}
191+
node.CPUUsed += quota
192+
return nil
193+
},
194+
).Times(3)
195+
_, err = c.CreateContainer(ctx, opts)
196+
assert.Error(t, err)
197+
assert.True(t, errors.Is(err, context.DeadlineExceeded))
198+
assert.Error(t, err, "UpdateNodeResource2")
199+
assert.EqualValues(t, 0, node1.CPUUsed)
200+
assert.EqualValues(t, 0, node2.CPUUsed)
201+
assert.EqualValues(t, 2, cnt)
202+
203+
// doAllocResource fails: SaveProcessing
204+
store.On("UpdateNodeResource",
205+
mock.AnythingOfType("*context.timerCtx"),
206+
mock.AnythingOfType("*types.Node"),
207+
mock.AnythingOfType("types.ResourceMap"),
208+
mock.AnythingOfType("float64"),
209+
mock.AnythingOfType("int64"),
210+
mock.AnythingOfType("int64"),
211+
mock.AnythingOfType("types.ResourceMap"),
212+
mock.AnythingOfType("string")).Return(
213+
func(ctx context.Context, node *types.Node, _ types.CPUMap, quota float64, _, _ int64, _ types.VolumeMap, action string) error {
214+
if action == st.ActionIncr {
215+
quota = -quota
216+
}
217+
node.CPUUsed += quota
218+
return nil
219+
},
220+
)
221+
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "SaveProcessing")).Once()
222+
_, err = c.CreateContainer(ctx, opts)
223+
assert.Error(t, err)
224+
assert.True(t, errors.Is(err, context.DeadlineExceeded))
225+
assert.Error(t, err, "SaveProcessing")
226+
assert.EqualValues(t, 0, node1.CPUUsed)
227+
assert.EqualValues(t, 0, node2.CPUUsed)
228+
229+
// doCreateContainerOnNode fails: doGetAndPrepareNode
230+
store.On("GetNode",
231+
mock.AnythingOfType("*context.timerCtx"),
232+
mock.AnythingOfType("string"),
233+
).Return(
234+
func(_ context.Context, name string) (node *types.Node) {
235+
node = node1
236+
if name == "n2" {
237+
node = node2
238+
}
239+
return
240+
}, nil)
241+
store.On("GetNode",
242+
mock.AnythingOfType("*context.cancelCtx"),
243+
mock.AnythingOfType("string"),
244+
).Return(
245+
func(_ context.Context, name string) (node *types.Node) {
246+
node = node1
247+
if name == "n2" {
248+
node = node2
249+
}
250+
return
251+
}, nil)
252+
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
253+
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
254+
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
255+
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
256+
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
257+
ch, err := c.CreateContainer(ctx, opts)
258+
assert.Nil(t, err)
259+
for m := range ch {
260+
assert.Error(t, m.Error)
261+
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
262+
assert.Error(t, m.Error, "ImagePull")
263+
}
264+
assert.EqualValues(t, 0, node1.CPUUsed)
265+
assert.EqualValues(t, 0, node2.CPUUsed)
266+
267+
// doCreateAndStartContainer fails: VirtualizationCreate
268+
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
269+
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", nil)
270+
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "VirtualizationCreate")).Twice()
271+
ch, err = c.CreateContainer(ctx, opts)
272+
assert.Nil(t, err)
273+
for m := range ch {
274+
assert.Error(t, m.Error)
275+
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
276+
assert.Error(t, m.Error, "VirtualizationCreate")
277+
}
278+
assert.EqualValues(t, 0, node1.CPUUsed)
279+
assert.EqualValues(t, 0, node2.CPUUsed)
280+
281+
// doCreateAndStartContainer fails: AddContainer
282+
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
283+
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
284+
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
285+
store.On("AddContainer", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddContainer")).Twice()
286+
engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
287+
store.On("RemoveContainer", mock.Anything, mock.Anything).Return(nil).Twice()
288+
ch, err = c.CreateContainer(ctx, opts)
289+
assert.Nil(t, err)
290+
for m := range ch {
291+
assert.Error(t, m.Error)
292+
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
293+
assert.Error(t, m.Error, "AddContainer")
294+
}
295+
assert.EqualValues(t, 0, node1.CPUUsed)
296+
assert.EqualValues(t, 0, node2.CPUUsed)
297+
298+
// doCreateAndStartContainer fails: RemoveContainer
299+
store.On("AddContainer", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddContainer")).Twice()
300+
store.On("RemoveContainer", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "RemoveContainer")).Twice()
301+
ch, err = c.CreateContainer(ctx, opts)
302+
assert.Nil(t, err)
303+
for m := range ch {
304+
assert.Error(t, m.Error)
305+
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
306+
assert.Error(t, m.Error, "AddContainer")
307+
}
308+
assert.EqualValues(t, 0, node1.CPUUsed)
309+
assert.EqualValues(t, 0, node2.CPUUsed)
310+
}

cluster/calcium/resource.go

+4-8
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
130130
var nodesInfo []types.NodeInfo
131131
var nodeCPUPlans map[string][]types.CPUMap
132132
var nodeVolumePlans map[string][]types.VolumePlan
133-
if err = c.withNodesLocked(ctx, opts.Podname, opts.Nodename, opts.NodeLabels, false, func(nodes map[string]*types.Node) error {
133+
return nodesInfo, c.withNodesLocked(ctx, opts.Podname, opts.Nodename, opts.NodeLabels, false, func(nodes map[string]*types.Node) error {
134134
if len(nodes) == 0 {
135135
return types.ErrInsufficientNodes
136136
}
@@ -196,7 +196,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
196196
return types.ErrInsufficientRes
197197
}
198198
nodesInfo = nodesInfo[p:]
199-
var track int
199+
track := -1
200200
return utils.Txn(
201201
ctx,
202202
func(ctx context.Context) error {
@@ -223,7 +223,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
223223
log.Infof("[allocResource] deploy %d to %s", nodeInfo.Deploy, nodeInfo.Name)
224224
}
225225
}()
226-
return nil
226+
return c.doBindProcessStatus(ctx, opts, nodesInfo)
227227
},
228228
func(ctx context.Context) error {
229229
for i := 0; i < track+1; i++ {
@@ -238,11 +238,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
238238
},
239239
c.config.GlobalTimeout,
240240
)
241-
}); err != nil {
242-
return nil, err
243-
}
244-
245-
return nodesInfo, c.doBindProcessStatus(ctx, opts, nodesInfo)
241+
})
246242
}
247243

248244
func (c *Calcium) doBindProcessStatus(ctx context.Context, opts *types.DeployOptions, nodesInfo []types.NodeInfo) error {

cluster/calcium/resource_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ func testAllocFailedAsUpdateNodeResourceError(t *testing.T, c *Calcium, opts *ty
389389
store.On("UpdateNodeResource",
390390
mock.Anything, mock.Anything, mock.Anything, mock.Anything,
391391
mock.Anything, mock.Anything, mock.Anything, mock.Anything,
392-
).Return(types.ErrNoETCD).Twice()
392+
).Return(types.ErrNoETCD).Once()
393393
_, err := c.doAllocResource(context.Background(), opts)
394394
assert.Error(t, err)
395395
}

go.mod

+7-1
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,24 @@ require (
4545
github.com/sanity-io/litter v1.1.0
4646
github.com/sirupsen/logrus v1.4.2
4747
github.com/soheilhy/cmux v0.1.3 // indirect
48-
github.com/stretchr/testify v1.4.0
48+
github.com/stretchr/objx v0.2.0 // indirect
49+
github.com/stretchr/testify v1.6.1
4950
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 // indirect
5051
github.com/urfave/cli/v2 v2.0.0-alpha.2
52+
github.com/vektra/mockery v1.1.2 // indirect
5153
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
5254
go.uber.org/atomic v1.3.2 // indirect
5355
go.uber.org/automaxprocs v1.3.0
5456
go.uber.org/multierr v1.1.0 // indirect
5557
go.uber.org/zap v1.7.1 // indirect
5658
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d
59+
golang.org/x/mod v0.3.0 // indirect
5760
golang.org/x/net v0.0.0-20200319234117-63522dbf7eec
5861
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
62+
golang.org/x/tools v0.0.0-20200612022331-742c5eb664c2 // indirect
5963
google.golang.org/grpc v1.28.0
6064
gopkg.in/alexcesaro/statsd.v2 v2.0.0 // indirect
65+
gopkg.in/yaml.v2 v2.3.0 // indirect
66+
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect
6167
gotest.tools v2.2.0+incompatible // indirect
6268
)

0 commit comments

Comments
 (0)