Skip to content

Commit 86cafe8

Browse files
anrsanrs
and
anrs
authored
feat: WAL for creating labmda (#342)
Co-authored-by: anrs <anders.hu@shopee.com>
1 parent cb6eaa9 commit 86cafe8

13 files changed

+498
-104
lines changed

cluster/calcium/calcium.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Calcium struct {
2424
scheduler scheduler.Scheduler
2525
source source.Source
2626
watcher discovery.Service
27+
wal *WAL
2728
}
2829

2930
// New returns a new cluster config
@@ -52,11 +53,17 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
5253
default:
5354
log.Warn("[Calcium] SCM not set, build API disabled")
5455
}
56+
if err != nil {
57+
log.Errorf("[Calcium] SCAM failed: %v", err)
58+
return nil, err
59+
}
5560

5661
// set watcher
5762
watcher := helium.New(config.GRPCConfig, store)
5863

59-
return &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}, err
64+
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
65+
cal.wal, err = newCalciumWAL(cal)
66+
return cal, err
6067
}
6168

6269
// Finalizer use for defer

cluster/calcium/calcium_test.go

+39-6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package calcium
33
import (
44
"context"
55
"io/ioutil"
6+
"os"
7+
"path/filepath"
68
"sync"
79
"testing"
810
"time"
@@ -32,6 +34,11 @@ func (d *dummyLock) Unlock(ctx context.Context) error {
3234
}
3335

3436
func NewTestCluster() *Calcium {
37+
walDir, err := ioutil.TempDir(os.TempDir(), "core.wal.*")
38+
if err != nil {
39+
panic(err)
40+
}
41+
3542
c := &Calcium{}
3643
c.config = types.Config{
3744
GlobalTimeout: 30 * time.Second,
@@ -42,34 +49,60 @@ func NewTestCluster() *Calcium {
4249
MaxShare: -1,
4350
ShareBase: 100,
4451
},
52+
WALFile: filepath.Join(walDir, "core.wal.log"),
4553
}
4654
c.store = &storemocks.Store{}
4755
c.scheduler = &schedulermocks.Scheduler{}
4856
c.source = &sourcemocks.Source{}
57+
58+
wal, err := newCalciumWAL(c)
59+
if err != nil {
60+
panic(err)
61+
}
62+
c.wal = wal
63+
4964
return c
5065
}
5166

5267
func TestNewCluster(t *testing.T) {
53-
_, err := New(types.Config{}, false)
68+
config := types.Config{WALFile: "/tmp/a"}
69+
_, err := New(config, false)
5470
assert.Error(t, err)
55-
c, err := New(types.Config{}, true)
71+
72+
c, err := New(config, true)
5673
assert.NoError(t, err)
74+
5775
c.Finalizer()
5876
privFile, err := ioutil.TempFile("", "priv")
5977
assert.NoError(t, err)
6078
_, err = privFile.WriteString("privkey")
6179
assert.NoError(t, err)
6280
defer privFile.Close()
81+
82+
config.Git = types.GitConfig{PrivateKey: privFile.Name()}
83+
84+
var wg sync.WaitGroup
85+
wg.Add(1)
6386
go func() {
64-
c, err := New(types.Config{Git: types.GitConfig{SCMType: "gitlab", PrivateKey: privFile.Name()}}, true)
65-
assert.NoError(t, err)
87+
defer wg.Done()
88+
config.Git.SCMType = "gitlab"
89+
config.WALFile = "/tmp/b"
90+
c, err := New(config, true)
91+
assert.NoError(t, err, err)
6692
c.Finalizer()
6793
}()
94+
95+
wg.Add(1)
6896
go func() {
69-
c, err := New(types.Config{Git: types.GitConfig{SCMType: "github", PrivateKey: privFile.Name()}}, true)
70-
assert.NoError(t, err)
97+
defer wg.Done()
98+
config.WALFile = "/tmp/c"
99+
config.Git.SCMType = "github"
100+
c, err := New(config, true)
101+
assert.NoError(t, err, err)
71102
c.Finalizer()
72103
}()
104+
105+
wg.Wait()
73106
}
74107

75108
func TestFinalizer(t *testing.T) {

cluster/calcium/create_test.go

+72-61
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestCreateWorkload(t *testing.T) {
8181
}
8282

8383
func TestCreateWorkloadTxn(t *testing.T) {
84-
c := NewTestCluster()
84+
c, nodes := newCreateWorkloadCluster(t)
8585
ctx := context.Background()
8686
opts := &types.DeployOptions{
8787
Name: "zc:name",
@@ -94,69 +94,12 @@ func TestCreateWorkloadTxn(t *testing.T) {
9494
Name: "good-entrypoint",
9595
},
9696
}
97-
store := &storemocks.Store{}
98-
sche := &schedulermocks.Scheduler{}
99-
scheduler.InitSchedulerV1(sche)
100-
c.store = store
101-
c.scheduler = sche
102-
engine := &enginemocks.API{}
103-
104-
pod1 := &types.Pod{Name: "p1"}
105-
node1 := &types.Node{
106-
NodeMeta: types.NodeMeta{
107-
Name: "n1",
108-
},
109-
Engine: engine,
110-
}
111-
node2 := &types.Node{
112-
NodeMeta: types.NodeMeta{
113-
Name: "n2",
114-
},
115-
Engine: engine,
116-
}
117-
nodes := []*types.Node{node1, node2}
118-
119-
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
120-
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
121-
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
12297

123-
// doAllocResource fails: MakeDeployStatus
124-
lock := &lockmocks.DistributedLock{}
125-
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
126-
lock.On("Unlock", mock.Anything).Return(nil)
127-
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
128-
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
129-
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
130-
store.On("GetNode",
131-
mock.AnythingOfType("*context.emptyCtx"),
132-
mock.AnythingOfType("string"),
133-
).Return(
134-
func(_ context.Context, name string) (node *types.Node) {
135-
node = node1
136-
if name == "n2" {
137-
node = node2
138-
}
139-
return
140-
}, nil)
141-
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
142-
return scheduleInfos
143-
}, len(nodes), nil)
144-
sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
145-
return scheduleInfos
146-
}, len(nodes), nil)
147-
sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo {
148-
return scheduleInfos
149-
}, nil, len(nodes), nil)
150-
sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
151-
func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo {
152-
for i := range scheduleInfos {
153-
scheduleInfos[i].Capacity = 1
154-
}
155-
return scheduleInfos
156-
}, len(nodes), nil)
98+
store := c.store.(*storemocks.Store)
15799
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(
158100
errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"),
159101
).Once()
102+
160103
ch, err := c.CreateWorkload(ctx, opts)
161104
assert.Nil(t, err)
162105
cnt := 0
@@ -168,7 +111,6 @@ func TestCreateWorkloadTxn(t *testing.T) {
168111
assert.EqualValues(t, 1, cnt)
169112

170113
// commit resource changes fails: UpdateNodes
171-
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
172114
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
173115
old := strategy.Plans[strategy.Auto]
174116
strategy.Plans[strategy.Auto] = func(sis []strategy.Info, need, total, _ int) (map[string]int, error) {
@@ -191,6 +133,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
191133
assert.Error(t, m.Error, "UpdateNodes1")
192134
}
193135
assert.EqualValues(t, 1, cnt)
136+
node1, node2 := nodes[0], nodes[1]
194137
assert.EqualValues(t, 1, node1.CPUUsed)
195138
assert.EqualValues(t, 1, node2.CPUUsed)
196139
node1.CPUUsed = 0
@@ -221,6 +164,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
221164
}
222165
return
223166
}, nil)
167+
engine := node1.Engine.(*enginemocks.API)
224168
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
225169
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
226170
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
@@ -299,3 +243,70 @@ func TestCreateWorkloadTxn(t *testing.T) {
299243
assert.EqualValues(t, 1, node1.CPUUsed+node2.CPUUsed)
300244
return
301245
}
246+
247+
func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
248+
c := NewTestCluster()
249+
c.store = &storemocks.Store{}
250+
c.scheduler = &schedulermocks.Scheduler{}
251+
scheduler.InitSchedulerV1(c.scheduler)
252+
253+
engine := &enginemocks.API{}
254+
pod1 := &types.Pod{Name: "p1"}
255+
node1 := &types.Node{
256+
NodeMeta: types.NodeMeta{
257+
Name: "n1",
258+
},
259+
Engine: engine,
260+
}
261+
node2 := &types.Node{
262+
NodeMeta: types.NodeMeta{
263+
Name: "n2",
264+
},
265+
Engine: engine,
266+
}
267+
nodes := []*types.Node{node1, node2}
268+
269+
store := c.store.(*storemocks.Store)
270+
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
271+
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
272+
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
273+
274+
// doAllocResource fails: MakeDeployStatus
275+
lock := &lockmocks.DistributedLock{}
276+
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
277+
lock.On("Unlock", mock.Anything).Return(nil)
278+
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
279+
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
280+
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
281+
store.On("GetNode",
282+
mock.AnythingOfType("*context.emptyCtx"),
283+
mock.AnythingOfType("string"),
284+
).Return(
285+
func(_ context.Context, name string) (node *types.Node) {
286+
node = node1
287+
if name == "n2" {
288+
node = node2
289+
}
290+
return
291+
}, nil)
292+
293+
sche := c.scheduler.(*schedulermocks.Scheduler)
294+
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
295+
return scheduleInfos
296+
}, len(nodes), nil)
297+
sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
298+
return scheduleInfos
299+
}, len(nodes), nil)
300+
sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo {
301+
return scheduleInfos
302+
}, nil, len(nodes), nil)
303+
sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
304+
func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo {
305+
for i := range scheduleInfos {
306+
scheduleInfos[i].Capacity = 1
307+
}
308+
return scheduleInfos
309+
}, len(nodes), nil)
310+
311+
return c, nodes
312+
}

cluster/calcium/lambda.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,20 @@ import (
77
"strconv"
88
"sync"
99

10+
"github.com/google/uuid"
11+
1012
enginetypes "github.com/projecteru2/core/engine/types"
1113
"github.com/projecteru2/core/log"
1214
"github.com/projecteru2/core/strategy"
1315
"github.com/projecteru2/core/types"
1416
"github.com/projecteru2/core/utils"
17+
"github.com/projecteru2/core/wal"
1518
)
1619

17-
const exitDataPrefix = "[exitcode] "
20+
const (
21+
exitDataPrefix = "[exitcode] "
22+
labelLambdaID = "LambdaID"
23+
)
1824

1925
// RunAndWait implement lambda
2026
func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachWorkloadMessage, error) {
@@ -28,6 +34,10 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
2834
return nil, types.ErrRunAndWaitCountOneWithStdin
2935
}
3036

37+
commit, err := c.walCreateLambda(ctx, opts)
38+
if err != nil {
39+
return nil, err
40+
}
3141
createChan, err := c.CreateWorkload(ctx, opts)
3242
if err != nil {
3343
log.Errorf("[RunAndWait] Create workload error %s", err)
@@ -113,8 +123,28 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
113123
go func() {
114124
defer close(runMsgCh)
115125
wg.Wait()
126+
if err := commit(context.Background()); err != nil {
127+
log.Errorf("[RunAndWait] Commit WAL %s failed", eventCreateLambda)
128+
}
116129
log.Info("[RunAndWait] Finish run and wait for workloads")
117130
}()
118131

119132
return runMsgCh, nil
120133
}
134+
135+
func (c *Calcium) walCreateLambda(ctx context.Context, opts *types.DeployOptions) (wal.Commit, error) {
136+
uid, err := uuid.NewRandom()
137+
if err != nil {
138+
return nil, err
139+
}
140+
141+
lambdaID := uid.String()
142+
143+
if opts.Labels != nil {
144+
opts.Labels[labelLambdaID] = lambdaID
145+
} else {
146+
opts.Labels = map[string]string{labelLambdaID: lambdaID}
147+
}
148+
149+
return c.wal.logCreateLambda(ctx, opts)
150+
}

0 commit comments

Comments
 (0)