Skip to content

Commit 2825832

Browse files
jschwinger233CMGS
authored andcommitted
add test cases, resolve dead lock
1 parent 113786d commit 2825832

21 files changed

+195
-114
lines changed

cluster/calcium/create.go

+11-12
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,18 @@ func (c *Calcium) CreateWorkload(ctx context.Context, opts *types.DeployOptions)
3434
return nil, logger.Err(errors.WithStack(types.NewDetailedErr(types.ErrBadCount, opts.Count)))
3535
}
3636

37-
ch, err := c.doCreateWorkloads(ctx, opts)
38-
return ch, logger.Err(errors.WithStack(err))
37+
return c.doCreateWorkloads(ctx, opts), nil
3938
}
4039

4140
// transaction: resource metadata consistency
42-
func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateWorkloadMessage, error) {
41+
func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptions) chan *types.CreateWorkloadMessage {
4342
logger := log.WithField("Calcium", "doCreateWorkloads").WithField("opts", opts)
4443
ch := make(chan *types.CreateWorkloadMessage)
4544
// RFC 计算当前 app 部署情况的时候需要保证同一时间只有这个 app 的这个 entrypoint 在跑
4645
// 因此需要在这里加个全局锁,直到部署完毕才释放
4746
// 通过 Processing 状态跟踪达成 18 Oct, 2018
4847

4948
var (
50-
err error
5149
plans []resourcetypes.ResourcePlans
5250
deployMap map[string]int
5351
rollbackMap map[string][]int
@@ -56,9 +54,8 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
5654
go func() {
5755
defer func() {
5856
for nodename := range deployMap {
59-
if e := c.store.DeleteProcessing(ctx, opts, nodename); e != nil {
60-
err = e
61-
logger.Errorf("[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, err)
57+
if e := c.store.DeleteProcessing(context.Background(), opts, nodename); e != nil {
58+
logger.Errorf("[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, e)
6259
}
6360
}
6461
close(ch)
@@ -78,7 +75,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
7875

7976
// calculate plans
8077
if plans, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
81-
return errors.WithStack(err)
78+
return err
8279
}
8380

8481
// commit changes
@@ -97,7 +94,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
9794
},
9895

9996
// then: deploy workloads
100-
func(ctx context.Context) error {
97+
func(ctx context.Context) (err error) {
10198
rollbackMap, err = c.doDeployWorkloads(ctx, ch, opts, plans, deployMap)
10299
return errors.WithStack(err)
103100
},
@@ -124,7 +121,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
124121
)
125122
}()
126123

127-
return ch, errors.WithStack(err)
124+
return ch
128125
}
129126

130127
func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWorkloadMessage, opts *types.DeployOptions, plans []resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) {
@@ -205,10 +202,12 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
205202
// remap 就不搞进事务了吧, 回滚代价太大了
206203
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
207204
// 而且 remap 是一个幂等操作, 就算这次 remap 失败, 下次 remap 也能收敛到正确到状态
208-
c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
205+
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
209206
c.doRemapResourceAndLog(ctx, logger, node)
210207
return nil
211-
})
208+
}); err != nil {
209+
logger.Errorf("failed to lock node to remap: %v", err)
210+
}
212211
return indices, errors.WithStack(err)
213212
}
214213

cluster/calcium/create_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ import (
55
"testing"
66

77
"github.com/pkg/errors"
8-
"github.com/stretchr/testify/assert"
9-
"github.com/stretchr/testify/mock"
10-
118
enginemocks "github.com/projecteru2/core/engine/mocks"
129
enginetypes "github.com/projecteru2/core/engine/types"
1310
lockmocks "github.com/projecteru2/core/lock/mocks"
@@ -19,6 +16,8 @@ import (
1916
"github.com/projecteru2/core/types"
2017
"github.com/projecteru2/core/wal"
2118
walmocks "github.com/projecteru2/core/wal/mocks"
19+
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/mock"
2221
)
2322

2423
func TestCreateWorkload(t *testing.T) {
@@ -110,11 +109,6 @@ func TestCreateWorkloadTxn(t *testing.T) {
110109
mwal := c.wal.WAL.(*walmocks.WAL)
111110
defer mwal.AssertExpectations(t)
112111
var walCommitted bool
113-
commit := wal.Commit(func(context.Context) error {
114-
walCommitted = true
115-
return nil
116-
})
117-
mwal.On("Log", mock.Anything, eventCreateWorkload, mock.Anything).Return(commit, nil)
118112

119113
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
120114
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
@@ -239,6 +233,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
239233
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "VirtualizationCreate")).Twice()
240234
engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
241235
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)
236+
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
242237
walCommitted = false
243238
ch, err = c.CreateWorkload(ctx, opts)
244239
assert.Nil(t, err)
@@ -259,6 +254,11 @@ func TestCreateWorkloadTxn(t *testing.T) {
259254
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
260255
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
261256
store.On("AddWorkload", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
257+
commit := wal.Commit(func(context.Context) error {
258+
walCommitted = true
259+
return nil
260+
})
261+
mwal.On("Log", mock.Anything, eventCreateWorkload, mock.Anything).Return(commit, nil)
262262
walCommitted = false
263263
ch, err = c.CreateWorkload(ctx, opts)
264264
assert.Nil(t, err)

cluster/calcium/dissociate.go

+11-26
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
2626

2727
for nodename, workloadIDs := range nodeWorkloadGroup {
2828
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
29-
for _, workloadID := range workloadIDs {
29+
for _, workloadID := range workloadIDs { // nolint:scopelint
30+
msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID} // nolint:scopelint
3031
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
31-
msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID}
32-
if err := utils.Txn(
32+
return utils.Txn(
3333
ctx,
3434
// if
35-
func(ctx context.Context) error {
36-
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
37-
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
35+
func(ctx context.Context) (err error) {
36+
if err = c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr); err == nil {
37+
log.Infof("[DissociateWorkload] Workload %s dissociated", workload.ID)
38+
}
39+
return errors.WithStack(err)
3840
},
3941
// then
4042
func(ctx context.Context) error {
@@ -48,36 +50,19 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
4850
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
4951
},
5052
c.config.GlobalTimeout,
51-
); err != nil {
52-
msg.Error = err
53-
logger.WithField("id", workloadID).Errorf("failed to diss workload: %+v", err)
54-
}
55-
ch <- msg
56-
return nil
53+
)
5754
}); err != nil {
5855
logger.WithField("id", workloadID).Errorf("failed to lock workload: %+v", err)
56+
msg.Error = err
5957
}
58+
ch <- msg
6059
}
6160
c.doRemapResourceAndLog(ctx, logger, node)
6261
return nil
6362
}); err != nil {
6463
logger.WithField("nodename", nodename).Errorf("failed to lock node: %+v", err)
6564
}
6665
}
67-
68-
for _, id := range ids {
69-
err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
70-
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) {
71-
72-
c.doRemapResourceAndLog(ctx, logger, node)
73-
return err
74-
})
75-
})
76-
if err != nil {
77-
logger.Errorf("[DissociateWorkload] Dissociate workload %s failed, err: %+v", id, err)
78-
}
79-
ch <- &types.DissociateWorkloadMessage{WorkloadID: id, Error: err}
80-
}
8166
}()
8267
return ch, nil
8368
}

cluster/calcium/dissociate_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -46,27 +46,33 @@ func TestDissociateWorkload(t *testing.T) {
4646

4747
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{c1}, nil)
4848
// failed by lock
49+
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
4950
store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
5051
ch, err := c.DissociateWorkload(ctx, []string{"c1"})
5152
assert.NoError(t, err)
5253
for r := range ch {
5354
assert.Error(t, r.Error)
5455
}
56+
store.AssertExpectations(t)
57+
5558
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
56-
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
5759
// failed by RemoveWorkload
60+
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
5861
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once()
62+
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
5963
ch, err = c.DissociateWorkload(ctx, []string{"c1"})
6064
assert.NoError(t, err)
6165
for r := range ch {
6266
assert.Error(t, r.Error)
6367
}
68+
store.AssertExpectations(t)
69+
6470
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)
6571
// success
66-
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
6772
ch, err = c.DissociateWorkload(ctx, []string{"c1"})
6873
assert.NoError(t, err)
6974
for r := range ch {
7075
assert.NoError(t, r.Error)
7176
}
77+
store.AssertExpectations(t)
7278
}

cluster/calcium/lock.go

+3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(co
5757
func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error {
5858
workloads := map[string]*types.Workload{}
5959
locks := map[string]lock.DistributedLock{}
60+
defer log.Debugf("[withWorkloadsLocked] Workloads %+v unlocked", ids)
6061
defer func() { c.doUnlockAll(context.Background(), locks) }()
6162
cs, err := c.GetWorkloads(ctx, ids)
6263
if err != nil {
@@ -68,6 +69,7 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
6869
if err != nil {
6970
return errors.WithStack(err)
7071
}
72+
log.Debugf("[withWorkloadsLocked] Workload %s locked", workload.ID)
7173
locks[workload.ID] = lock
7274
workloads[workload.ID] = workload
7375
}
@@ -77,6 +79,7 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
7779
func (c *Calcium) withNodesLocked(ctx context.Context, podname string, nodenames []string, labels map[string]string, all bool, f func(context.Context, map[string]*types.Node) error) error {
7880
nodes := map[string]*types.Node{}
7981
locks := map[string]lock.DistributedLock{}
82+
defer log.Debugf("[withNodesLocked] Nodes %+v unlocked", nodenames)
8083
defer c.doUnlockAll(context.Background(), locks)
8184
ns, err := c.getNodes(ctx, podname, nodenames, labels, all)
8285
if err != nil {

cluster/calcium/realloc.go

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload
6363
return errors.WithStack(err)
6464
}
6565
}
66+
6667
return errors.WithStack(c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload))
6768
},
6869
// then commit changes

cluster/calcium/realloc_test.go

+8-9
Original file line numberDiff line numberDiff line change
@@ -285,14 +285,11 @@ func TestReallocBindCpu(t *testing.T) {
285285
ctx := context.Background()
286286
store := &storemocks.Store{}
287287
c.store = store
288-
pod1 := &types.Pod{
289-
Name: "p1",
290-
}
288+
291289
lock := &lockmocks.DistributedLock{}
292290
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
293291
lock.On("Unlock", mock.Anything).Return(nil)
294292
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
295-
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
296293
engine := &enginemocks.API{}
297294
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
298295

@@ -361,32 +358,34 @@ func TestReallocBindCpu(t *testing.T) {
361358

362359
store.On("GetNode", mock.Anything, "node3").Return(node3, nil)
363360
store.On("GetWorkloads", mock.Anything, []string{"c5"}).Return([]*types.Workload{c5}, nil)
364-
store.On("GetWorkloads", mock.Anything, []string{"c6"}).Return([]*types.Workload{c6}, nil)
365-
store.On("GetWorkloads", mock.Anything, []string{"c6", "c5"}).Return([]*types.Workload{c5, c6}, nil)
366361
engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil)
367362
store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(nil)
368363

369364
// failed by UpdateNodes
370365
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(types.ErrBadWorkloadID).Once()
371366
err := c.ReallocResource(ctx, newReallocOptions("c5", 0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep))
372367
assert.Error(t, err)
373-
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil)
368+
store.AssertExpectations(t)
374369

370+
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
371+
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil)
375372
err = c.ReallocResource(ctx, newReallocOptions("c5", 0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep))
376373
assert.NoError(t, err)
377374
assert.Equal(t, 0, len(c5.ResourceMeta.CPU))
375+
store.AssertExpectations(t)
378376

377+
store.On("GetWorkloads", mock.Anything, []string{"c6"}).Return([]*types.Workload{c6}, nil)
379378
err = c.ReallocResource(ctx, newReallocOptions("c6", 0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep))
380379
assert.NoError(t, err)
381380
assert.NotEmpty(t, c6.ResourceMeta.CPU)
381+
store.AssertExpectations(t)
382382

383383
node3.CPU = types.CPUMap{"0": 10, "1": 70, "2": 100, "3": 100}
384384
err = c.ReallocResource(ctx, newReallocOptions("c5", -0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep))
385-
386385
assert.NoError(t, err)
387386
assert.NotEmpty(t, c5.ResourceMeta.CPU)
388387
err = c.ReallocResource(ctx, newReallocOptions("c6", -0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep))
389-
390388
assert.NoError(t, err)
391389
assert.Equal(t, 0, len(c6.ResourceMeta.CPU))
390+
store.AssertExpectations(t)
392391
}

cluster/calcium/remove.go

+9-14
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,17 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
3535
defer wg.Done()
3636
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
3737
for _, workloadID := range workloadIDs {
38+
ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}}
3839
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
39-
ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}}
40-
if err := utils.Txn(
40+
return utils.Txn(
4141
ctx,
4242
// if
4343
func(ctx context.Context) error {
4444
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionIncr))
4545
},
4646
// then
47-
func(ctx context.Context) error {
48-
err := errors.WithStack(c.doRemoveWorkload(ctx, workload, force))
49-
if err != nil {
47+
func(ctx context.Context) (err error) {
48+
if err = errors.WithStack(c.doRemoveWorkload(ctx, workload, force)); err != nil {
5049
log.Infof("[RemoveWorkload] Workload %s removed", workload.ID)
5150
}
5251
return err
@@ -59,22 +58,19 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
5958
return errors.WithStack(c.store.UpdateNodeResource(ctx, node, &workload.ResourceMeta, store.ActionDecr))
6059
},
6160
c.config.GlobalTimeout,
62-
); err != nil {
63-
logger.WithField("id", workloadID).Errorf("[RemoveWorkload] Remove workload failed: %+v", err)
64-
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
65-
ret.Success = false
66-
}
67-
68-
ch <- ret
69-
return nil
61+
)
7062
}); err != nil {
7163
logger.WithField("id", workloadID).Errorf("failed to lock workload: %+v", err)
64+
ret.Hook = append(ret.Hook, bytes.NewBufferString(err.Error()))
65+
ret.Success = false
7266
}
67+
ch <- ret
7368
}
7469
c.doRemapResourceAndLog(ctx, logger, node)
7570
return nil
7671
}); err != nil {
7772
logger.WithField("nodename", nodename).Errorf("failed to lock node: %+v", err)
73+
ch <- &types.RemoveWorkloadMessage{Success: false}
7874
}
7975
}(nodename, workloadIDs)
8076
}
@@ -103,7 +99,6 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload
10399
},
104100
c.config.GlobalTimeout,
105101
)
106-
107102
}
108103

109104
// 同步地删除容器, 在某些需要等待的场合异常有用!

0 commit comments

Comments
 (0)