Skip to content

Commit 4464cb0

Browse files
committed
always return errors to client
1 parent 2350fdc commit 4464cb0

File tree

2 files changed

+165
-34
lines changed

2 files changed

+165
-34
lines changed

cluster/calcium/lambda.go

+41-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package calcium
33
import (
44
"bufio"
55
"context"
6+
"fmt"
67
"io"
78
"strconv"
89
"sync"
@@ -46,11 +47,19 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
4647
return nil, err
4748
}
4849

49-
runMsgCh := make(chan *types.AttachWorkloadMessage)
50-
wg := &sync.WaitGroup{}
50+
var (
51+
runMsgCh = make(chan *types.AttachWorkloadMessage)
52+
wg = &sync.WaitGroup{}
53+
errorMessages = []*types.AttachWorkloadMessage{}
54+
)
5155
for message := range createChan {
5256
if message.Error != nil || message.WorkloadID == "" {
5357
logger.Errorf("[RunAndWait] Create workload failed %+v", message.Error)
58+
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
59+
WorkloadID: "",
60+
Data: []byte(fmt.Sprintf("Create workload failed %+v", message.Error)),
61+
StdStreamType: types.Stderr,
62+
})
5463
continue
5564
}
5665

@@ -67,13 +76,27 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
6776
workload, err := c.GetWorkload(ctx, message.WorkloadID)
6877
if err != nil {
6978
logger.Errorf("[RunAndWait] Get workload failed %+v", err)
79+
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
80+
WorkloadID: message.WorkloadID,
81+
Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, err)),
82+
StdStreamType: types.Stderr,
83+
})
7084
return
7185
}
7286

7387
var stdout, stderr io.ReadCloser
7488
if stdout, stderr, err = workload.Engine.VirtualizationLogs(ctx, &enginetypes.VirtualizationLogStreamOptions{
75-
ID: message.WorkloadID, Follow: true, Stdout: true, Stderr: true}); err != nil {
89+
ID: message.WorkloadID,
90+
Follow: true,
91+
Stdout: true,
92+
Stderr: true,
93+
}); err != nil {
7694
logger.Errorf("[RunAndWait] Can't fetch log of workload %s error %+v", message.WorkloadID, err)
95+
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
96+
WorkloadID: message.WorkloadID,
97+
Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, err)),
98+
StdStreamType: types.Stderr,
99+
})
77100
return
78101
}
79102

@@ -85,6 +108,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
85108
stdout, stderr, inStream, err = workload.Engine.VirtualizationAttach(ctx, message.WorkloadID, true, true)
86109
if err != nil {
87110
logger.Errorf("[RunAndWait] Can't attach workload %s error %+v", message.WorkloadID, err)
111+
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
112+
WorkloadID: message.WorkloadID,
113+
Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, err)),
114+
StdStreamType: types.Stderr,
115+
})
88116
return
89117
}
90118

@@ -95,7 +123,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
95123
splitFunc, split = bufio.ScanBytes, byte(0)
96124
}
97125

98-
// return workload id as first message for lambda
126+
// return workload id as first normal message for lambda
99127
runMsgCh <- &types.AttachWorkloadMessage{
100128
WorkloadID: message.WorkloadID,
101129
Data: []byte(""),
@@ -113,6 +141,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
113141
r, err := workload.Engine.VirtualizationWait(ctx, message.WorkloadID, "")
114142
if err != nil {
115143
logger.Errorf("[RunAndWait] %s wait failed %+v", utils.ShortID(message.WorkloadID), err)
144+
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
145+
WorkloadID: message.WorkloadID,
146+
Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, err)),
147+
StdStreamType: types.Stderr,
148+
})
116149
return
117150
}
118151

@@ -134,6 +167,10 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
134167
if err := commit(); err != nil {
135168
logger.Errorf("[RunAndWait] Commit WAL %s failed: %v", eventCreateLambda, err)
136169
}
170+
171+
for _, message := range errorMessages {
172+
runMsgCh <- message
173+
}
137174
log.Info("[RunAndWait] Finish run and wait for workloads")
138175
}()
139176

cluster/calcium/lambda_test.go

+124-30
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import (
55
"fmt"
66
"io"
77
"io/ioutil"
8+
"strings"
89
"testing"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/mock"
12-
"github.com/stretchr/testify/require"
1313

1414
enginemocks "github.com/projecteru2/core/engine/mocks"
1515
enginetypes "github.com/projecteru2/core/engine/types"
@@ -25,6 +25,7 @@ import (
2525
)
2626

2727
func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
28+
assert := assert.New(t)
2829
c, _ := newCreateWorkloadCluster(t)
2930
c.wal = &WAL{WAL: &walmocks.WAL{}}
3031

@@ -53,19 +54,29 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
5354
mstore.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")).Once()
5455

5556
ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
56-
require.NoError(t, err)
57-
require.NotNil(t, ch)
58-
require.False(t, walCommitted)
59-
require.Nil(t, <-ch) // recv nil due to the ch will be closed.
57+
assert.NoError(err)
58+
assert.NotNil(ch)
59+
assert.False(walCommitted)
60+
m := <-ch
61+
assert.Equal(m.WorkloadID, "")
62+
assert.True(strings.HasPrefix(string(m.Data), "Create workload failed"))
6063

6164
lambdaID, exists := opts.Labels[labelLambdaID]
62-
require.True(t, exists)
63-
require.True(t, len(lambdaID) > 1)
64-
require.True(t, walCommitted)
65+
assert.True(exists)
66+
assert.True(len(lambdaID) > 1)
67+
assert.True(walCommitted)
68+
assert.Equal(m.StdStreamType, types.Stderr)
6569
}
6670

6771
func TestLambdaWithWorkloadIDReturned(t *testing.T) {
68-
c, _ := newLambdaCluster(t)
72+
assert := assert.New(t)
73+
c, nodes := newLambdaCluster(t)
74+
engine := nodes[0].Engine.(*enginemocks.API)
75+
76+
workload := &types.Workload{ID: "workloadfortonictest", Engine: engine}
77+
store := c.store.(*storemocks.Store)
78+
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
79+
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
6980

7081
opts := &types.DeployOptions{
7182
Name: "zc:name",
@@ -79,12 +90,112 @@ func TestLambdaWithWorkloadIDReturned(t *testing.T) {
7990
},
8091
}
8192

93+
r1, w1 := io.Pipe()
94+
go func() {
95+
w1.Write([]byte("stdout line1\n"))
96+
w1.Write([]byte("stdout line2\n"))
97+
w1.Close()
98+
}()
99+
r2, w2 := io.Pipe()
100+
go func() {
101+
w2.Write([]byte("stderr line1\n"))
102+
w2.Write([]byte("stderr line2\n"))
103+
w2.Close()
104+
}()
105+
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)
106+
engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationWaitResult{Code: 0}, nil)
107+
82108
ch, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
83-
assert.NoError(t, err)
84-
assert.NotNil(t, ch)
109+
assert.NoError(err)
110+
assert.NotNil(ch)
85111

86-
m := <-ch
87-
require.Equal(t, m.WorkloadID, "workloadfortonictest")
112+
ms := []*types.AttachWorkloadMessage{}
113+
for m := range ch {
114+
ms = append(ms, m)
115+
}
116+
assert.Equal(len(ms), 8)
117+
assert.Equal(ms[0].WorkloadID, "workloadfortonictest")
118+
assert.Equal(ms[0].Data, []byte(""))
119+
assert.Equal(ms[1].WorkloadID, "workloadfortonictest")
120+
assert.Equal(ms[1].Data, []byte(""))
121+
assert.True(strings.HasPrefix(string(ms[7].Data), exitDataPrefix))
122+
assert.True(strings.HasPrefix(string(ms[6].Data), exitDataPrefix))
123+
}
124+
125+
func TestLambdaWithError(t *testing.T) {
126+
assert := assert.New(t)
127+
c, nodes := newLambdaCluster(t)
128+
engine := nodes[0].Engine.(*enginemocks.API)
129+
130+
workload := &types.Workload{ID: "workloadfortonictest", Engine: engine}
131+
store := c.store.(*storemocks.Store)
132+
store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil)
133+
134+
opts := &types.DeployOptions{
135+
Name: "zc:name",
136+
Count: 2,
137+
DeployStrategy: strategy.Auto,
138+
Podname: "p1",
139+
ResourceOpts: types.ResourceOptions{CPUQuotaLimit: 1},
140+
Image: "zc:test",
141+
Entrypoint: &types.Entrypoint{
142+
Name: "good-entrypoint",
143+
},
144+
}
145+
146+
store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error")).Twice()
147+
ch0, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
148+
assert.NoError(err)
149+
assert.NotNil(ch0)
150+
m0 := <-ch0
151+
assert.Equal(m0.WorkloadID, "workloadfortonictest")
152+
assert.True(strings.HasPrefix(string(m0.Data), "Get workload"))
153+
154+
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
155+
156+
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("error")).Twice()
157+
ch1, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
158+
assert.NoError(err)
159+
assert.NotNil(ch1)
160+
m1 := <-ch1
161+
assert.Equal(m1.WorkloadID, "workloadfortonictest")
162+
assert.True(strings.HasPrefix(string(m1.Data), "Fetch log for workload"))
163+
164+
r1, w1 := io.Pipe()
165+
go func() {
166+
w1.Write([]byte("stdout line1\n"))
167+
w1.Write([]byte("stdout line2\n"))
168+
w1.Close()
169+
}()
170+
r2, w2 := io.Pipe()
171+
go func() {
172+
w2.Write([]byte("stderr line1\n"))
173+
w2.Write([]byte("stderr line2\n"))
174+
w2.Close()
175+
}()
176+
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)
177+
178+
engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error"))
179+
ch2, err := c.RunAndWait(context.Background(), opts, make(chan []byte))
180+
assert.NoError(err)
181+
assert.NotNil(ch2)
182+
183+
ms := []*types.AttachWorkloadMessage{}
184+
for m := range ch2 {
185+
ms = append(ms, m)
186+
}
187+
assert.Equal(len(ms), 8)
188+
assert.Equal(ms[0].WorkloadID, "workloadfortonictest")
189+
assert.Equal(ms[0].Data, []byte(""))
190+
assert.Equal(ms[1].WorkloadID, "workloadfortonictest")
191+
assert.Equal(ms[1].Data, []byte(""))
192+
193+
m2 := ms[7]
194+
assert.Equal(m2.WorkloadID, "workloadfortonictest")
195+
assert.True(strings.HasPrefix(string(m2.Data), "Wait workload"))
196+
m3 := ms[6]
197+
assert.Equal(m3.WorkloadID, "workloadfortonictest")
198+
assert.True(strings.HasPrefix(string(m3.Data), "Wait workload"))
88199
}
89200

90201
func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
@@ -167,21 +278,6 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
167278
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
168279
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", nil)
169280

170-
r1, w1 := io.Pipe()
171-
go func() {
172-
w1.Write([]byte("stdout line1\n"))
173-
w1.Write([]byte("stdout line2\n"))
174-
w1.Close()
175-
}()
176-
r2, w2 := io.Pipe()
177-
go func() {
178-
w2.Write([]byte("stderr line1\n"))
179-
w2.Write([]byte("stderr line2\n"))
180-
w2.Close()
181-
}()
182-
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(r1), ioutil.NopCloser(r2), nil)
183-
engine.On("VirtualizationWait", mock.Anything, mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationWaitResult{Code: 0})
184-
185281
// doCreateAndStartWorkload fails: AddWorkload
186282
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "workloadfortonictest"}, nil)
187283
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
@@ -190,7 +286,5 @@ func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {
190286
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
191287
store.On("AddWorkload", mock.Anything, mock.Anything).Return(nil)
192288

193-
workload := &types.Workload{ID: "workloadfortonictest", Engine: engine}
194-
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
195289
return c, nodes
196290
}

0 commit comments

Comments
 (0)