Skip to content

Commit 766a9e0

Browse files
authored
return workload id in lambda (#385)
* return workload id in lambda * ... should be go 1.16
1 parent f7b4343 commit 766a9e0

File tree

2 files changed

+146
-0
lines changed

2 files changed

+146
-0
lines changed

cluster/calcium/lambda.go

+6
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
9595
splitFunc, split = bufio.ScanBytes, byte(0)
9696
}
9797

98+
// return workload id as first message for lambda
99+
runMsgCh <- &types.AttachWorkloadMessage{
100+
WorkloadID: message.WorkloadID,
101+
Data: []byte(""),
102+
StdStreamType: types.Stdout,
103+
}
98104
for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) {
99105
runMsgCh <- &types.AttachWorkloadMessage{
100106
WorkloadID: message.WorkloadID,

cluster/calcium/lambda_test.go

+140
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,20 @@ package calcium
33
import (
44
"context"
55
"fmt"
6+
"io"
7+
"io/ioutil"
68
"testing"
79

10+
"github.com/stretchr/testify/assert"
811
"github.com/stretchr/testify/mock"
912
"github.com/stretchr/testify/require"
1013

14+
enginemocks "github.com/projecteru2/core/engine/mocks"
15+
enginetypes "github.com/projecteru2/core/engine/types"
16+
lockmocks "github.com/projecteru2/core/lock/mocks"
17+
resourcetypes "github.com/projecteru2/core/resources/types"
18+
"github.com/projecteru2/core/scheduler"
19+
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
1120
storemocks "github.com/projecteru2/core/store/mocks"
1221
"github.com/projecteru2/core/strategy"
1322
"github.com/projecteru2/core/types"
@@ -54,3 +63,134 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
5463
require.True(t, len(lambdaID) > 1)
5564
require.True(t, walCommitted)
5665
}
66+
67+
func TestLambdaWithWorkloadIDReturned(t *testing.T) {
68+
c, _ := newLambdaCluster(t)
69+
70+
opts := &types.DeployOptions{
71+
Name: "zc:name",
72+
Count: 2,
73+
DeployStrategy: strategy.Auto,
74+
Podname: "p1",
75+
ResourceOpts: types.ResourceOptions{CPUQuotaLimit: 1},
76+
Image: "zc:test",
77+
Entrypoint: &types.Entrypoint{
78+
Name: "good-entrypoint",
79+
},
80+
}
81+
82+
ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
83+
assert.NoError(t, err)
84+
assert.NotNil(t, ch)
85+
86+
m := <-ch
87+
require.Equal(t, m.WorkloadID, "workloadfortonictest")
88+
}
89+
90+
func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
91+
c, nodes := newCreateWorkloadCluster(t)
92+
93+
store := &storemocks.Store{}
94+
sche := &schedulermocks.Scheduler{}
95+
scheduler.InitSchedulerV1(sche)
96+
c.store = store
97+
c.scheduler = sche
98+
99+
node1, node2 := nodes[0], nodes[1]
100+
101+
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
102+
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
103+
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
104+
105+
lock := &lockmocks.DistributedLock{}
106+
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
107+
lock.On("Unlock", mock.Anything).Return(nil)
108+
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
109+
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
110+
store.On("GetNode",
111+
mock.AnythingOfType("*context.emptyCtx"),
112+
mock.AnythingOfType("string"),
113+
).Return(
114+
func(_ context.Context, name string) (node *types.Node) {
115+
node = node1
116+
if name == "n2" {
117+
node = node2
118+
}
119+
return
120+
}, nil)
121+
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
122+
return scheduleInfos
123+
}, len(nodes), nil)
124+
sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
125+
return scheduleInfos
126+
}, len(nodes), nil)
127+
sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo {
128+
return scheduleInfos
129+
}, nil, len(nodes), nil)
130+
sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
131+
func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo {
132+
for i := range scheduleInfos {
133+
scheduleInfos[i].Capacity = 1
134+
}
135+
return scheduleInfos
136+
}, len(nodes), nil)
137+
138+
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
139+
old := strategy.Plans[strategy.Auto]
140+
strategy.Plans[strategy.Auto] = func(sis []strategy.Info, need, total, _ int) (map[string]int, error) {
141+
deployInfos := make(map[string]int)
142+
for _, si := range sis {
143+
deployInfos[si.Nodename] = 1
144+
}
145+
return deployInfos, nil
146+
}
147+
defer func() {
148+
strategy.Plans[strategy.Auto] = old
149+
}()
150+
151+
store.On("UpdateNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil)
152+
store.On("UpdateNodes", mock.Anything, mock.Anything).Return(nil)
153+
store.On("GetNode",
154+
mock.AnythingOfType("*context.timerCtx"),
155+
mock.AnythingOfType("string"),
156+
).Return(
157+
func(_ context.Context, name string) (node *types.Node) {
158+
node = node1
159+
if name == "n2" {
160+
node = node2
161+
}
162+
return
163+
}, nil)
164+
engine := node1.Engine.(*enginemocks.API)
165+
166+
// doDeployOneWorkload fails: VirtualizationCreate
167+
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
168+
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", nil)
169+
170+
r1, w1 := io.Pipe()
171+
go func() {
172+
w1.Write([]byte("stdout line1\n"))
173+
w1.Write([]byte("stdout line2\n"))
174+
w1.Close()
175+
}()
176+
r2, w2 := io.Pipe()
177+
go func() {
178+
w2.Write([]byte("stderr line1\n"))
179+
w2.Write([]byte("stderr line2\n"))
180+
w2.Close()
181+
}()
182+
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)
183+
engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationWaitResult{Code: 0})
184+
185+
// doCreateAndStartWorkload fails: AddWorkload
186+
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "workloadfortonictest"}, nil)
187+
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
188+
engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
189+
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD)
190+
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
191+
store.On("AddWorkload", mock.Anything, mock.Anything).Return(nil)
192+
193+
workload := &types.Workload{ID: "workloadfortonictest", Engine: engine}
194+
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
195+
return c, nodes
196+
}

0 commit comments

Comments
 (0)