Skip to content

Commit 11f86cd

Browse files
authored
refactor: new send and copy (#287)
* bugfix: minor revised copy RPC interface * refactor: send and copy
1 parent 046e3bd commit 11f86cd

File tree

12 files changed

+377
-404
lines changed

12 files changed

+377
-404
lines changed

Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ unit-test:
4141
./source/common/... \
4242
./strategy/... \
4343
./scheduler/complex/... \
44-
./rpc/. ./lock/etcdlock/... \
44+
./rpc/. \
45+
./lock/etcdlock/... \
4546
./auth/simple/... \
4647
./cluster/calcium/... \
4748
./discovery/helium... \

cluster/calcium/build.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"io/ioutil"
910
"os"
1011
"time"
1112

@@ -117,8 +118,7 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
117118
lastMessage.Error = err.Error()
118119
break
119120
}
120-
malformed := []byte{}
121-
_, _ = decoder.Buffered().Read(malformed)
121+
malformed, _ := ioutil.ReadAll(decoder.Buffered()) // TODO err check
122122
log.Errorf("[BuildImage] Decode build image message failed %v, buffered: %v", err, malformed)
123123
return
124124
}

cluster/calcium/copy.go

+10-21
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"sync"
66

7-
"github.com/projecteru2/core/cluster"
87
"github.com/projecteru2/core/types"
98
log "github.com/sirupsen/logrus"
109
)
@@ -17,30 +16,20 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
1716
wg := sync.WaitGroup{}
1817
log.Infof("[Copy] Copy %d workloads files", len(opts.Targets))
1918
// workload one by one
20-
for cid, paths := range opts.Targets {
19+
for ID, paths := range opts.Targets {
2120
wg.Add(1)
22-
go func(cid string, paths []string) {
21+
go func(ID string, paths []string) {
2322
defer wg.Done()
24-
workload, err := c.GetWorkload(ctx, cid)
25-
if err != nil {
26-
log.Errorf("[Copy] Error when get workload %s, err %v", cid, err)
27-
ch <- makeCopyMessage(cid, cluster.CopyFailed, "", "", err, nil)
28-
return
29-
}
30-
for _, path := range paths {
31-
wg.Add(1)
32-
go func(path string) {
33-
defer wg.Done()
23+
if err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error {
24+
for _, path := range paths {
3425
resp, name, err := workload.Engine.VirtualizationCopyFrom(ctx, workload.ID, path)
35-
if err != nil {
36-
log.Errorf("[Copy] Error during CopyFromWorkload: %v", err)
37-
ch <- makeCopyMessage(cid, cluster.CopyFailed, "", path, err, nil)
38-
return
39-
}
40-
ch <- makeCopyMessage(cid, cluster.CopyOK, name, path, nil, resp)
41-
}(path)
26+
ch <- makeCopyMessage(ID, name, path, err, resp)
27+
}
28+
return nil
29+
}); err != nil {
30+
ch <- makeCopyMessage(ID, "", "", err, nil)
4231
}
43-
}(cid, paths)
32+
}(ID, paths)
4433
}
4534
wg.Wait()
4635
}()

cluster/calcium/copy_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/stretchr/testify/assert"
88

99
enginemocks "github.com/projecteru2/core/engine/mocks"
10+
lockmocks "github.com/projecteru2/core/lock/mocks"
1011
storemocks "github.com/projecteru2/core/store/mocks"
1112
"github.com/projecteru2/core/types"
1213
"github.com/stretchr/testify/mock"
@@ -24,18 +25,23 @@ func TestCopy(t *testing.T) {
2425
},
2526
}
2627
store := &storemocks.Store{}
28+
lock := &lockmocks.DistributedLock{}
29+
lock.On("Lock", mock.Anything).Return(nil)
30+
lock.On("Unlock", mock.Anything).Return(nil)
31+
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
2732
c.store = store
2833
// failed by GetWorkload
29-
store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
34+
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
3035
ch, err := c.Copy(ctx, opts)
3136
assert.NoError(t, err)
3237
for r := range ch {
3338
assert.Error(t, r.Error)
3439
}
3540
workload := &types.Workload{ID: "cid"}
41+
workloads := []*types.Workload{workload}
3642
engine := &enginemocks.API{}
3743
workload.Engine = engine
38-
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
44+
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(workloads, nil)
3945
// failed by VirtualizationCopyFrom
4046
engine.On("VirtualizationCopyFrom", mock.Anything, mock.Anything, mock.Anything).Return(nil, "", types.ErrNilEngine).Twice()
4147
ch, err = c.Copy(ctx, opts)

cluster/calcium/helper.go

+15-17
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"io"
8+
"io/ioutil"
89

910
"github.com/projecteru2/core/engine"
1011
enginetypes "github.com/projecteru2/core/engine/types"
@@ -32,17 +33,17 @@ func execuateInside(ctx context.Context, client engine.API, ID, cmd, user string
3233
AttachStderr: true,
3334
AttachStdout: true,
3435
}
36+
b := []byte{}
3537
execID, err := client.ExecCreate(ctx, ID, execConfig)
3638
if err != nil {
37-
return []byte{}, err
39+
return b, err
3840
}
3941

4042
outStream, _, err := client.ExecAttach(ctx, execID, false)
4143
if err != nil {
42-
return []byte{}, err
44+
return b, err
4345
}
4446

45-
b := []byte{}
4647
for data := range processVirtualizationOutStream(ctx, outStream) {
4748
b = append(b, data...)
4849
}
@@ -107,14 +108,13 @@ func pullImage(ctx context.Context, node *types.Node, image string) error {
107108
return nil
108109
}
109110

110-
func makeCopyMessage(id, status, name, path string, err error, data io.ReadCloser) *types.CopyMessage {
111+
func makeCopyMessage(id, name, path string, err error, data io.ReadCloser) *types.CopyMessage {
111112
return &types.CopyMessage{
112-
ID: id,
113-
Status: status,
114-
Name: name,
115-
Path: path,
116-
Error: err,
117-
Data: data,
113+
ID: id,
114+
Name: name,
115+
Path: path,
116+
Error: err,
117+
Data: data,
118118
}
119119
}
120120

@@ -201,14 +201,12 @@ func processBuildImageStream(reader io.ReadCloser) chan *types.BuildImageMessage
201201
message := &types.BuildImageMessage{}
202202
err := decoder.Decode(message)
203203
if err != nil {
204-
if err == io.EOF {
205-
break
204+
if err != io.EOF {
205+
malformed, _ := ioutil.ReadAll(decoder.Buffered()) // TODO err check
206+
log.Errorf("[processBuildImageStream] Decode image message failed %v, buffered: %s", err, string(malformed))
207+
message.Error = err.Error()
208+
ch <- message
206209
}
207-
malformed := []byte{}
208-
_, _ = decoder.Buffered().Read(malformed)
209-
log.Errorf("[processBuildImageStream] Decode image message failed %v, buffered: %s", err, string(malformed))
210-
message.Error = err.Error()
211-
ch <- message
212210
break
213211
}
214212
ch <- message

cluster/calcium/send.go

+11-13
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,22 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
1717
go func() {
1818
defer close(ch)
1919
wg := &sync.WaitGroup{}
20-
for dst, content := range opts.Data {
21-
log.Infof("[Send] Send files to %s", dst)
20+
21+
for _, ID := range opts.IDs {
22+
log.Infof("[Send] Send files to %s", ID)
2223
wg.Add(1)
23-
go func(dst string, content []byte) {
24+
go func(ID string) {
2425
defer wg.Done()
25-
for _, ID := range opts.IDs {
26-
workload, err := c.GetWorkload(ctx, ID)
27-
if err != nil {
28-
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
29-
continue
30-
}
31-
if err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true); err != nil {
26+
if err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error {
27+
for dst, content := range opts.Data {
28+
err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true)
3229
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
33-
continue
3430
}
35-
ch <- &types.SendMessage{ID: ID, Path: dst}
31+
return nil
32+
}); err != nil {
33+
ch <- &types.SendMessage{ID: ID, Error: err}
3634
}
37-
}(dst, content)
35+
}(ID)
3836
}
3937
wg.Wait()
4038
}()

cluster/calcium/send_test.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88

99
enginemocks "github.com/projecteru2/core/engine/mocks"
10+
lockmocks "github.com/projecteru2/core/lock/mocks"
1011
storemocks "github.com/projecteru2/core/store/mocks"
1112
"github.com/projecteru2/core/types"
1213
"github.com/stretchr/testify/assert"
@@ -28,16 +29,20 @@ func TestSend(t *testing.T) {
2829
}
2930
store := &storemocks.Store{}
3031
c.store = store
32+
lock := &lockmocks.DistributedLock{}
33+
lock.On("Lock", mock.Anything).Return(nil)
34+
lock.On("Unlock", mock.Anything).Return(nil)
35+
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
3136
// failed by GetWorkload
32-
store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
37+
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
3338
ch, err := c.Send(ctx, opts)
3439
assert.NoError(t, err)
3540
for r := range ch {
3641
assert.Error(t, r.Error)
3742
}
3843
engine := &enginemocks.API{}
39-
store.On("GetWorkload", mock.Anything, mock.Anything).Return(
40-
&types.Workload{Engine: engine}, nil,
44+
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(
45+
[]*types.Workload{{ID: "cid", Engine: engine}}, nil,
4146
)
4247
// failed by engine
4348
content, _ := ioutil.ReadAll(tmpfile)

cluster/cluster.go

-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ const (
1212
Gitlab = "gitlab"
1313
// Github for github
1414
Github = "github"
15-
// CopyFailed for copy failed
16-
CopyFailed = "failed"
17-
// CopyOK for copy ok
18-
CopyOK = "ok"
1915
// CPUPeriodBase for cpu period base
2016
CPUPeriodBase = 100000
2117
// ERUMark mark workload controlled by eru

0 commit comments

Comments
 (0)