Skip to content

Commit 10b0708

Browse files
Overhaul wal for create workload (#534)
* wal for createworkload will clean resources * wal for resource consistancy during creating * wal for processing * handle created WAL by distinguishing status * revise unittest set * engine VirtualizationRemove return ErrWorkloadNotExists
1 parent 80fcdb5 commit 10b0708

File tree

10 files changed

+259
-109
lines changed

10 files changed

+259
-109
lines changed

cluster/calcium/create.go

+42-10
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,36 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
5555
defer func() {
5656
cctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout)
5757
for nodename := range deployMap {
58-
if e := c.store.DeleteProcessing(cctx, opts.GetProcessing(nodename)); e != nil {
59-
logger.Errorf(ctx, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, e)
58+
processing := opts.GetProcessing(nodename)
59+
if err := c.store.DeleteProcessing(cctx, processing); err != nil {
60+
logger.Errorf(ctx, "[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, err)
6061
}
6162
}
6263
close(ch)
6364
cancel()
6465
}()
6566

67+
var resourceCommit wal.Commit
68+
defer func() {
69+
if resourceCommit != nil {
70+
if err := resourceCommit(); err != nil {
71+
logger.Errorf(ctx, "commit wal failed: %s, %+v", eventWorkloadResourceAllocated, err)
72+
}
73+
}
74+
}()
75+
76+
var processingCommits map[string]wal.Commit
77+
defer func() {
78+
for nodename := range processingCommits {
79+
if processingCommits[nodename] == nil {
80+
continue
81+
}
82+
if err := processingCommits[nodename](); err != nil {
83+
logger.Errorf(ctx, "commit wal failed: %s, %s, %+v", eventProcessingCreated, nodename, err)
84+
}
85+
}
86+
}()
87+
6688
_ = utils.Txn(
6789
ctx,
6890

@@ -81,15 +103,23 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
81103

82104
// commit changes
83105
nodes := []*types.Node{}
106+
processingCommits = make(map[string]wal.Commit)
84107
for nodename, deploy := range deployMap {
85108
for _, plan := range plans {
86109
plan.ApplyChangesOnNode(nodeMap[nodename], utils.Range(deploy)...)
87110
}
88111
nodes = append(nodes, nodeMap[nodename])
89-
if err = c.store.CreateProcessing(ctx, opts.GetProcessing(nodename), deploy); err != nil {
112+
processing := opts.GetProcessing(nodename)
113+
if processingCommits[nodename], err = c.wal.Log(eventProcessingCreated, processing); err != nil {
114+
return errors.WithStack(err)
115+
}
116+
if err = c.store.CreateProcessing(ctx, processing, deploy); err != nil {
90117
return errors.WithStack(err)
91118
}
92119
}
120+
if resourceCommit, err = c.wal.Log(eventWorkloadResourceAllocated, nodes); err != nil {
121+
return errors.WithStack(err)
122+
}
93123
return errors.WithStack(c.store.UpdateNodes(ctx, nodes...))
94124
})
95125
},
@@ -246,6 +276,7 @@ func (c *Calcium) doDeployOneWorkload(
246276
config *enginetypes.VirtualizationCreateOptions,
247277
decrProcessing bool,
248278
) (err error) {
279+
logger := log.WithField("Calcium", "doDeployWorkload").WithField("nodename", node.Name).WithField("opts", opts).WithField("msg", msg)
249280
workload := &types.Workload{
250281
ResourceMeta: types.ResourceMeta{
251282
CPU: msg.CPU,
@@ -276,7 +307,7 @@ func (c *Calcium) doDeployOneWorkload(
276307
defer func() {
277308
if commit != nil {
278309
if err := commit(); err != nil {
279-
log.Errorf(ctx, "[doDeployOneWorkload] Commit WAL %s failed: %v", eventCreateWorkload, err)
310+
logger.Errorf(ctx, "Commit WAL %s failed: %+v", eventWorkloadCreated, err)
280311
}
281312
}
282313
}()
@@ -292,10 +323,11 @@ func (c *Calcium) doDeployOneWorkload(
292323
// We couldn't WAL the workload ID above VirtualizationCreate temporarily,
293324
// so there's a time gap window, once the core process crashes between
294325
// VirtualizationCreate and logCreateWorkload then the worload is leaky.
295-
if commit, err = c.wal.logCreateWorkload(workload.ID, node.Name); err != nil {
296-
return err
297-
}
298-
return nil
326+
commit, err = c.wal.Log(eventWorkloadCreated, &types.Workload{
327+
ID: workload.ID,
328+
Nodename: workload.Nodename,
329+
})
330+
return errors.WithStack(err)
299331
},
300332

301333
func(ctx context.Context) (err error) {
@@ -375,13 +407,13 @@ func (c *Calcium) doDeployOneWorkload(
375407

376408
// remove workload
377409
func(ctx context.Context, _ bool) error {
378-
log.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
410+
logger.Errorf(ctx, "[doDeployOneWorkload] failed to deploy workload %s, rollback", workload.ID)
379411
if workload.ID == "" {
380412
return nil
381413
}
382414

383415
if err := c.store.RemoveWorkload(ctx, workload); err != nil {
384-
log.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s")
416+
logger.Errorf(ctx, "[doDeployOneWorkload] failed to remove workload %s", workload.ID)
385417
}
386418

387419
return workload.Remove(ctx, true)

cluster/calcium/create_test.go

+9-11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/pkg/errors"
78
enginemocks "github.com/projecteru2/core/engine/mocks"
89
enginetypes "github.com/projecteru2/core/engine/types"
910
lockmocks "github.com/projecteru2/core/lock/mocks"
@@ -15,8 +16,6 @@ import (
1516
"github.com/projecteru2/core/types"
1617
"github.com/projecteru2/core/wal"
1718
walmocks "github.com/projecteru2/core/wal/mocks"
18-
19-
"github.com/pkg/errors"
2019
"github.com/stretchr/testify/assert"
2120
"github.com/stretchr/testify/mock"
2221
)
@@ -107,8 +106,12 @@ func TestCreateWorkloadTxn(t *testing.T) {
107106

108107
c.wal = &WAL{WAL: &walmocks.WAL{}}
109108
mwal := c.wal.WAL.(*walmocks.WAL)
110-
defer mwal.AssertExpectations(t)
111109
var walCommitted bool
110+
commit := wal.Commit(func() error {
111+
walCommitted = true
112+
return nil
113+
})
114+
mwal.On("Log", mock.Anything, mock.Anything).Return(commit, nil)
112115

113116
store.On("CreateProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
114117
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
@@ -191,7 +194,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
191194
assert.EqualValues(t, 1, node2.CPUUsed)
192195
node1.CPUUsed = 0
193196
node2.CPUUsed = 0
194-
assert.False(t, walCommitted)
197+
assert.True(t, walCommitted)
195198

196199
// doCreateWorkloadOnNode fails: doGetAndPrepareNode
197200
store.On("UpdateNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil)
@@ -224,7 +227,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
224227
assert.EqualValues(t, 2, cnt)
225228
assert.EqualValues(t, 0, node1.CPUUsed)
226229
assert.EqualValues(t, 0, node2.CPUUsed)
227-
assert.False(t, walCommitted)
230+
assert.True(t, walCommitted)
228231

229232
// doDeployOneWorkload fails: VirtualizationCreate
230233
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
@@ -245,18 +248,13 @@ func TestCreateWorkloadTxn(t *testing.T) {
245248
assert.EqualValues(t, 2, cnt)
246249
assert.EqualValues(t, 0, node1.CPUUsed)
247250
assert.EqualValues(t, 0, node2.CPUUsed)
248-
assert.False(t, walCommitted)
251+
assert.True(t, walCommitted)
249252

250253
// doCreateAndStartWorkload fails: AddWorkload
251254
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
252255
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
253256
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
254257
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
255-
commit := wal.Commit(func() error {
256-
walCommitted = true
257-
return nil
258-
})
259-
mwal.On("Log", eventCreateWorkload, mock.Anything).Return(commit, nil)
260258
walCommitted = false
261259
ch, err = c.CreateWorkload(ctx, opts)
262260
assert.Nil(t, err)

cluster/calcium/lambda_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
2929
c.wal = &WAL{WAL: &walmocks.WAL{}}
3030

3131
mwal := c.wal.WAL.(*walmocks.WAL)
32-
defer mwal.AssertExpectations(t)
32+
defer mwal.AssertNotCalled(t, "Log")
33+
mwal.On("Log", mock.Anything, mock.Anything).Return(nil, nil)
3334

3435
opts := &types.DeployOptions{
3536
Name: "zc:name",

cluster/calcium/replace_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,6 @@ func TestReplaceWorkload(t *testing.T) {
168168
// failed by VirtualizationCreate
169169
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, types.ErrCannotGetEngine).Once()
170170
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
171-
//store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
172-
//engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
173-
//store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil).Once()
174171
ch, err = c.ReplaceWorkload(ctx, opts)
175172
assert.NoError(t, err)
176173
for r := range ch {

0 commit comments

Comments
 (0)