Skip to content

Commit 9a2dea0

Browse files
committed
add control number to identify
1 parent 4464cb0 commit 9a2dea0

File tree

7 files changed

+216
-192
lines changed

7 files changed

+216
-192
lines changed

cluster/calcium/lambda.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
5858
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
5959
WorkloadID: "",
6060
Data: []byte(fmt.Sprintf("Create workload failed %+v", message.Error)),
61-
StdStreamType: types.Stderr,
61+
StdStreamType: types.EruError,
6262
})
6363
continue
6464
}
@@ -79,7 +79,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
7979
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
8080
WorkloadID: message.WorkloadID,
8181
Data: []byte(fmt.Sprintf("Get workload %s failed %+v", message.WorkloadID, err)),
82-
StdStreamType: types.Stderr,
82+
StdStreamType: types.EruError,
8383
})
8484
return
8585
}
@@ -95,7 +95,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
9595
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
9696
WorkloadID: message.WorkloadID,
9797
Data: []byte(fmt.Sprintf("Fetch log for workload %s failed %+v", message.WorkloadID, err)),
98-
StdStreamType: types.Stderr,
98+
StdStreamType: types.EruError,
9999
})
100100
return
101101
}
@@ -111,7 +111,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
111111
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
112112
WorkloadID: message.WorkloadID,
113113
Data: []byte(fmt.Sprintf("Attach to workload %s failed %+v", message.WorkloadID, err)),
114-
StdStreamType: types.Stderr,
114+
StdStreamType: types.EruError,
115115
})
116116
return
117117
}
@@ -127,7 +127,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
127127
runMsgCh <- &types.AttachWorkloadMessage{
128128
WorkloadID: message.WorkloadID,
129129
Data: []byte(""),
130-
StdStreamType: types.Stdout,
130+
StdStreamType: types.TypeWorkloadID,
131131
}
132132
for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) {
133133
runMsgCh <- &types.AttachWorkloadMessage{
@@ -144,7 +144,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
144144
errorMessages = append(errorMessages, &types.AttachWorkloadMessage{
145145
WorkloadID: message.WorkloadID,
146146
Data: []byte(fmt.Sprintf("Wait workload %s failed %+v", message.WorkloadID, err)),
147-
StdStreamType: types.Stderr,
147+
StdStreamType: types.EruError,
148148
})
149149
return
150150
}
@@ -154,7 +154,11 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
154154
}
155155

156156
exitData := []byte(exitDataPrefix + strconv.Itoa(int(r.Code)))
157-
runMsgCh <- &types.AttachWorkloadMessage{WorkloadID: message.WorkloadID, Data: exitData}
157+
runMsgCh <- &types.AttachWorkloadMessage{
158+
WorkloadID: message.WorkloadID,
159+
Data: exitData,
160+
StdStreamType: types.Stdout,
161+
}
158162
}
159163

160164
wg.Add(1)

cluster/calcium/lambda_test.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestRunAndWaitFailedThenWALCommitted(t *testing.T) {
6565
assert.True(exists)
6666
assert.True(len(lambdaID) > 1)
6767
assert.True(walCommitted)
68-
assert.Equal(m.StdStreamType, types.Stderr)
68+
assert.Equal(m.StdStreamType, types.EruError)
6969
}
7070

7171
func TestLambdaWithWorkloadIDReturned(t *testing.T) {
@@ -116,10 +116,9 @@ func TestLambdaWithWorkloadIDReturned(t *testing.T) {
116116
assert.Equal(len(ms), 8)
117117
assert.Equal(ms[0].WorkloadID, "workloadfortonictest")
118118
assert.Equal(ms[0].Data, []byte(""))
119-
assert.Equal(ms[1].WorkloadID, "workloadfortonictest")
120-
assert.Equal(ms[1].Data, []byte(""))
119+
assert.Equal(ms[0].StdStreamType, types.TypeWorkloadID)
121120
assert.True(strings.HasPrefix(string(ms[7].Data), exitDataPrefix))
122-
assert.True(strings.HasPrefix(string(ms[6].Data), exitDataPrefix))
121+
assert.Equal(ms[7].StdStreamType, types.Stdout)
123122
}
124123

125124
func TestLambdaWithError(t *testing.T) {
@@ -150,6 +149,7 @@ func TestLambdaWithError(t *testing.T) {
150149
m0 := <-ch0
151150
assert.Equal(m0.WorkloadID, "workloadfortonictest")
152151
assert.True(strings.HasPrefix(string(m0.Data), "Get workload"))
152+
assert.Equal(m0.StdStreamType, types.EruError)
153153

154154
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
155155

@@ -160,6 +160,7 @@ func TestLambdaWithError(t *testing.T) {
160160
m1 := <-ch1
161161
assert.Equal(m1.WorkloadID, "workloadfortonictest")
162162
assert.True(strings.HasPrefix(string(m1.Data), "Fetch log for workload"))
163+
assert.Equal(m1.StdStreamType, types.EruError)
163164

164165
r1, w1 := io.Pipe()
165166
go func() {
@@ -187,15 +188,11 @@ func TestLambdaWithError(t *testing.T) {
187188
assert.Equal(len(ms), 8)
188189
assert.Equal(ms[0].WorkloadID, "workloadfortonictest")
189190
assert.Equal(ms[0].Data, []byte(""))
190-
assert.Equal(ms[1].WorkloadID, "workloadfortonictest")
191-
assert.Equal(ms[1].Data, []byte(""))
192-
193-
m2 := ms[7]
194-
assert.Equal(m2.WorkloadID, "workloadfortonictest")
195-
assert.True(strings.HasPrefix(string(m2.Data), "Wait workload"))
196-
m3 := ms[6]
197-
assert.Equal(m3.WorkloadID, "workloadfortonictest")
198-
assert.True(strings.HasPrefix(string(m3.Data), "Wait workload"))
191+
assert.Equal(ms[0].StdStreamType, types.TypeWorkloadID)
192+
193+
assert.Equal(ms[7].WorkloadID, "workloadfortonictest")
194+
assert.True(strings.HasPrefix(string(ms[7].Data), "Wait workload"))
195+
assert.Equal(ms[7].StdStreamType, types.EruError)
199196
}
200197

201198
func newLambdaCluster(t *testing.T) (*Calcium, []*types.Node) {

0 commit comments

Comments
 (0)