Skip to content

Commit a3bafe3

Browse files
anrsanrs
anrs
authored andcommitted
feat: WAL for creating workload.
1 parent b84b401 commit a3bafe3

File tree

6 files changed

+272
-31
lines changed

6 files changed

+272
-31
lines changed

cluster/calcium/calcium_test.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/mock"
14+
1215
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
1316
sourcemocks "github.com/projecteru2/core/source/mocks"
1417
storemocks "github.com/projecteru2/core/store/mocks"
1518
"github.com/projecteru2/core/types"
16-
"github.com/stretchr/testify/assert"
19+
"github.com/projecteru2/core/wal"
20+
walmocks "github.com/projecteru2/core/wal/mocks"
1721
)
1822

1923
// DummyLock replace lock for testing
@@ -55,12 +59,11 @@ func NewTestCluster() *Calcium {
5559
c.store = &storemocks.Store{}
5660
c.scheduler = &schedulermocks.Scheduler{}
5761
c.source = &sourcemocks.Source{}
62+
c.wal = &WAL{WAL: &walmocks.WAL{}}
5863

59-
wal, err := newCalciumWAL(c)
60-
if err != nil {
61-
panic(err)
62-
}
63-
c.wal = wal
64+
mwal := c.wal.WAL.(*walmocks.WAL)
65+
commit := wal.Commit(func(context.Context) error { return nil })
66+
mwal.On("Log", mock.Anything, mock.Anything, mock.Anything).Return(commit, nil)
6467

6568
return c
6669
}

cluster/calcium/create.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@ import (
77
"time"
88

99
"github.com/pkg/errors"
10+
"github.com/sanity-io/litter"
11+
"golang.org/x/sync/semaphore"
12+
1013
"github.com/projecteru2/core/cluster"
1114
enginetypes "github.com/projecteru2/core/engine/types"
15+
"github.com/projecteru2/core/log"
1216
"github.com/projecteru2/core/metrics"
1317
resourcetypes "github.com/projecteru2/core/resources/types"
14-
15-
"github.com/projecteru2/core/log"
1618
"github.com/projecteru2/core/types"
1719
"github.com/projecteru2/core/utils"
18-
"github.com/sanity-io/litter"
19-
"golang.org/x/sync/semaphore"
20+
"github.com/projecteru2/core/wal"
2021
)
2122

2223
// CreateWorkload use options to create workloads
@@ -259,6 +260,14 @@ func (c *Calcium) doDeployOneWorkload(
259260
User: opts.User,
260261
CreateTime: time.Now().Unix(),
261262
}
263+
var commit wal.Commit
264+
defer func() {
265+
if commit != nil {
266+
if err := commit(context.Background()); err != nil {
267+
log.Errorf("[doDeployOneWorkload] Commit WAL %s failed: %v", eventCreateWorkload, err)
268+
}
269+
}
270+
}()
262271
return utils.Txn(
263272
ctx,
264273
// create workload
@@ -268,6 +277,12 @@ func (c *Calcium) doDeployOneWorkload(
268277
return errors.WithStack(err)
269278
}
270279
workload.ID = created.ID
280+
// We couldn't WAL the workload ID above VirtualizationCreate temporarily,
281+
// so there's a time gap window, once the core process crashes between
282+
// VirtualizationCreate and logCreateWorkload then the worload is leaky.
283+
if commit, err = c.wal.logCreateWorkload(ctx, workload.ID, node.Name); err != nil {
284+
return errors.WithStack(err)
285+
}
271286
return nil
272287
},
273288

cluster/calcium/create_test.go

+33-17
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import (
55
"testing"
66

77
"github.com/pkg/errors"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/mock"
10+
811
enginemocks "github.com/projecteru2/core/engine/mocks"
912
enginetypes "github.com/projecteru2/core/engine/types"
1013
lockmocks "github.com/projecteru2/core/lock/mocks"
@@ -14,8 +17,8 @@ import (
1417
storemocks "github.com/projecteru2/core/store/mocks"
1518
"github.com/projecteru2/core/strategy"
1619
"github.com/projecteru2/core/types"
17-
"github.com/stretchr/testify/assert"
18-
"github.com/stretchr/testify/mock"
20+
"github.com/projecteru2/core/wal"
21+
walmocks "github.com/projecteru2/core/wal/mocks"
1922
)
2023

2124
func TestCreateWorkload(t *testing.T) {
@@ -100,21 +103,18 @@ func TestCreateWorkloadTxn(t *testing.T) {
100103
scheduler.InitSchedulerV1(sche)
101104
c.store = store
102105
c.scheduler = sche
103-
engine := &enginemocks.API{}
104106

105-
node1 := &types.Node{
106-
NodeMeta: types.NodeMeta{
107-
Name: "n1",
108-
},
109-
Engine: engine,
110-
}
111-
node2 := &types.Node{
112-
NodeMeta: types.NodeMeta{
113-
Name: "n2",
114-
},
115-
Engine: engine,
116-
}
117-
nodes = []*types.Node{node1, node2}
107+
node1, node2 := nodes[0], nodes[1]
108+
109+
c.wal = &WAL{WAL: &walmocks.WAL{}}
110+
mwal := c.wal.WAL.(*walmocks.WAL)
111+
defer mwal.AssertExpectations(t)
112+
var walCommitted bool
113+
commit := wal.Commit(func(context.Context) error {
114+
walCommitted = true
115+
return nil
116+
})
117+
mwal.On("Log", mock.Anything, eventCreateWorkload, mock.Anything).Return(commit, nil)
118118

119119
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
120120
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
@@ -166,6 +166,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
166166
assert.Error(t, m.Error, "MakeDeployStatus")
167167
}
168168
assert.EqualValues(t, 1, cnt)
169+
assert.False(t, walCommitted)
169170

170171
// commit resource changes fails: UpdateNodes
171172
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
@@ -181,6 +182,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
181182
strategy.Plans[strategy.Auto] = old
182183
}()
183184
store.On("UpdateNodes", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "UpdateNodes1")).Once()
185+
walCommitted = false
184186
ch, err = c.CreateWorkload(ctx, opts)
185187
assert.Nil(t, err)
186188
cnt = 0
@@ -195,6 +197,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
195197
assert.EqualValues(t, 1, node2.CPUUsed)
196198
node1.CPUUsed = 0
197199
node2.CPUUsed = 0
200+
assert.False(t, walCommitted)
198201

199202
// doCreateWorkloadOnNode fails: doGetAndPrepareNode
200203
store.On("UpdateNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil)
@@ -210,11 +213,12 @@ func TestCreateWorkloadTxn(t *testing.T) {
210213
}
211214
return
212215
}, nil)
213-
engine = node1.Engine.(*enginemocks.API)
216+
engine := node1.Engine.(*enginemocks.API)
214217
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
215218
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
216219
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
217220
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
221+
walCommitted = false
218222
ch, err = c.CreateWorkload(ctx, opts)
219223
assert.Nil(t, err)
220224
cnt = 0
@@ -227,13 +231,15 @@ func TestCreateWorkloadTxn(t *testing.T) {
227231
assert.EqualValues(t, 2, cnt)
228232
assert.EqualValues(t, 0, node1.CPUUsed)
229233
assert.EqualValues(t, 0, node2.CPUUsed)
234+
assert.False(t, walCommitted)
230235

231236
// doDeployOneWorkload fails: VirtualizationCreate
232237
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
233238
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", nil)
234239
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "VirtualizationCreate")).Twice()
235240
engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
236241
store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)
242+
walCommitted = false
237243
ch, err = c.CreateWorkload(ctx, opts)
238244
assert.Nil(t, err)
239245
cnt = 0
@@ -246,12 +252,14 @@ func TestCreateWorkloadTxn(t *testing.T) {
246252
assert.EqualValues(t, 2, cnt)
247253
assert.EqualValues(t, 0, node1.CPUUsed)
248254
assert.EqualValues(t, 0, node2.CPUUsed)
255+
assert.False(t, walCommitted)
249256

250257
// doCreateAndStartWorkload fails: AddWorkload
251258
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
252259
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
253260
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
254261
store.On("AddWorkload", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
262+
walCommitted = false
255263
ch, err = c.CreateWorkload(ctx, opts)
256264
assert.Nil(t, err)
257265
cnt = 0
@@ -264,13 +272,15 @@ func TestCreateWorkloadTxn(t *testing.T) {
264272
assert.EqualValues(t, 2, cnt)
265273
assert.EqualValues(t, 0, node1.CPUUsed)
266274
assert.EqualValues(t, 0, node2.CPUUsed)
275+
assert.True(t, walCommitted)
267276

268277
// doCreateAndStartWorkload fails: first time AddWorkload failed
269278
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
270279
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
271280
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
272281
store.On("AddWorkload", mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload2")).Once()
273282
store.On("AddWorkload", mock.Anything, mock.Anything).Return(nil).Once()
283+
walCommitted = false
274284
ch, err = c.CreateWorkload(ctx, opts)
275285
assert.Nil(t, err)
276286
cnt = 0
@@ -287,12 +297,14 @@ func TestCreateWorkloadTxn(t *testing.T) {
287297
assert.EqualValues(t, 2, cnt)
288298
assert.EqualValues(t, 1, errCnt)
289299
assert.EqualValues(t, 1, node1.CPUUsed+node2.CPUUsed)
300+
assert.True(t, walCommitted)
290301
store.AssertExpectations(t)
291302
engine.AssertExpectations(t)
292303
}
293304

294305
func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
295306
c := NewTestCluster()
307+
c.wal = &WAL{WAL: &walmocks.WAL{}}
296308
c.store = &storemocks.Store{}
297309
c.scheduler = &schedulermocks.Scheduler{}
298310
scheduler.InitSchedulerV1(c.scheduler)
@@ -337,6 +349,10 @@ func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
337349
return
338350
}, nil)
339351

352+
mwal := c.wal.WAL.(*walmocks.WAL)
353+
commit := wal.Commit(func(context.Context) error { return nil })
354+
mwal.On("Log", mock.Anything, mock.Anything, mock.Anything).Return(commit, nil)
355+
340356
sche := c.scheduler.(*schedulermocks.Scheduler)
341357
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
342358
return scheduleInfos

cluster/calcium/lambda.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
124124
defer close(runMsgCh)
125125
wg.Wait()
126126
if err := commit(context.Background()); err != nil {
127-
log.Errorf("[RunAndWait] Commit WAL %s failed", eventCreateLambda)
127+
log.Errorf("[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err)
128128
}
129129
log.Info("[RunAndWait] Finish run and wait for workloads")
130130
}()

cluster/calcium/wal.go

+92-1
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ package calcium
33
import (
44
"context"
55
"encoding/json"
6+
"strings"
67

78
"github.com/projecteru2/core/log"
89
"github.com/projecteru2/core/types"
910
"github.com/projecteru2/core/wal"
1011
)
1112

1213
const (
13-
eventCreateLambda = "create-lambda"
14+
eventCreateLambda = "create-lambda"
15+
eventCreateWorkload = "create-workload"
1416
)
1517

1618
// WAL for calcium.
@@ -37,6 +39,14 @@ func newCalciumWAL(cal *Calcium) (*WAL, error) {
3739

3840
func (w *WAL) registerHandlers() {
3941
w.Register(newCreateLambdaHandler(w.calcium))
42+
w.Register(newCreateWorkloadHandler(w.calcium))
43+
}
44+
45+
func (w *WAL) logCreateWorkload(ctx context.Context, workloadID, nodename string) (wal.Commit, error) {
46+
return w.Log(ctx, eventCreateWorkload, &types.Workload{
47+
ID: workloadID,
48+
Nodename: nodename,
49+
})
4050
}
4151

4252
func (w *WAL) logCreateLambda(ctx context.Context, opts *types.DeployOptions) (wal.Commit, error) {
@@ -47,6 +57,87 @@ func (w *WAL) logCreateLambda(ctx context.Context, opts *types.DeployOptions) (w
4757
})
4858
}
4959

60+
// CreateWorkloadHandler indicates event handler for creating workload.
61+
type CreateWorkloadHandler struct {
62+
event string
63+
calcium *Calcium
64+
}
65+
66+
func newCreateWorkloadHandler(cal *Calcium) *CreateWorkloadHandler {
67+
return &CreateWorkloadHandler{
68+
event: eventCreateWorkload,
69+
calcium: cal,
70+
}
71+
}
72+
73+
// Event .
74+
func (h *CreateWorkloadHandler) Event() string {
75+
return h.event
76+
}
77+
78+
// Check .
79+
func (h *CreateWorkloadHandler) Check(raw interface{}) (bool, error) {
80+
wrk, ok := raw.(*types.Workload)
81+
if !ok {
82+
return false, types.NewDetailedErr(types.ErrInvalidType, raw)
83+
}
84+
85+
_, err := h.calcium.GetWorkload(context.Background(), wrk.ID)
86+
switch {
87+
// there has been an exact workload metadata.
88+
case err == nil:
89+
log.Infof("[CreateWorkloadHandler.Check] Workload %s is availalbe", wrk.ID)
90+
return false, nil
91+
92+
case strings.HasPrefix(err.Error(), types.ErrBadCount.Error()):
93+
log.Errorf("[CreateWorkloadHandler.Check] No such workload: %v", wrk.ID)
94+
return true, nil
95+
96+
default:
97+
log.Errorf("[CreateWorkloadHandler.Check] Unexpected error: %v", err)
98+
return false, err
99+
}
100+
}
101+
102+
// Encode .
103+
func (h *CreateWorkloadHandler) Encode(raw interface{}) ([]byte, error) {
104+
wrk, ok := raw.(*types.Workload)
105+
if !ok {
106+
return nil, types.NewDetailedErr(types.ErrInvalidType, raw)
107+
}
108+
return json.Marshal(wrk)
109+
}
110+
111+
// Decode .
112+
func (h *CreateWorkloadHandler) Decode(bs []byte) (interface{}, error) {
113+
wrk := &types.Workload{}
114+
err := json.Unmarshal(bs, wrk)
115+
return wrk, err
116+
}
117+
118+
// Handle .
119+
func (h *CreateWorkloadHandler) Handle(raw interface{}) error {
120+
wrk, ok := raw.(*types.Workload)
121+
if !ok {
122+
return types.NewDetailedErr(types.ErrInvalidType, raw)
123+
}
124+
125+
// There hasn't been the exact workload metadata, so we must remove it.
126+
node, err := h.calcium.GetNode(context.Background(), wrk.Nodename)
127+
if err != nil {
128+
log.Errorf("[CreateWorkloadHandler.Check] Get node %s failed: %v", wrk.Nodename, err)
129+
return err
130+
}
131+
wrk.Engine = node.Engine
132+
133+
err = wrk.Remove(context.Background(), true)
134+
if err != nil {
135+
log.Errorf("[CreateWorkloadHandler.Check] Remove %s failed: %v", wrk.ID, err)
136+
}
137+
138+
return err
139+
}
140+
50141
// CreateLambdaHandler indicates event handler for creating lambda.
51142
type CreateLambdaHandler struct {
52143
event string

0 commit comments

Comments
 (0)