Skip to content

Commit 825eaf0

Browse files
authored
support send large file to workload (#532)
* Add send large file * Finished the code and can compile successfully, but it hasn't been tested yet * now it can send large file chunk by chunk, but still need to make some adjustments * add file size in metadata * now it can send servel large file, but sometimes they faild and i don't know the reason * modify return parameters for `SendLargeFile` * bug fix * modify the definition of grpc * delete some annotation and modify the code * fix dead lock, modify chunk size * bug fix * add some annotation * rollback makefile * modify the function 'newWorkloadExecutor' and let it just send one file * modify the RPC method 'send', let it call 'SendLargeFile' * modify the struct of 'SendLargeFileOptions' * clean up the code * clean up the code * regenerate mock * delete debug code * add test for SendLarge in cluster * replace copychunk to copy in send * let chunkSize as a const * bug fix: need to add waitgroup in the very beginning * lint * use defer * fix bug that cannot return err in goroutine * use different err parameter in function * adjust the code for CR
1 parent dd058ca commit 825eaf0

34 files changed

+2241
-1041
lines changed

3rdmocks/ServerStream.go

+5-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cluster/calcium/replace_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func TestReplaceWorkload(t *testing.T) {
184184

185185
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "new"}, nil)
186186
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
187-
engine.On("VirtualizationCopyTo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
187+
engine.On("VirtualizationCopyChunkTo", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
188188
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{User: "test"}, nil)
189189
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(nil)
190190
// failed by remove workload

cluster/calcium/send.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package calcium
22

33
import (
4+
"bytes"
45
"context"
56
"sync"
67

@@ -48,5 +49,5 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
4849

4950
func (c *Calcium) doSendFileToWorkload(ctx context.Context, engine engine.API, ID string, file types.LinuxFile) error {
5051
log.WithFunc("calcium.doSendFileToWorkload").Infof(ctx, "Send file to %s:%s", ID, file.Filename)
51-
return engine.VirtualizationCopyTo(ctx, ID, file.Filename, file.Clone().Content, file.UID, file.GID, file.Mode)
52+
return engine.VirtualizationCopyChunkTo(ctx, ID, file.Filename, int64(len(file.Content)), bytes.NewReader(file.Clone().Content), file.UID, file.GID, file.Mode)
5253
}

cluster/calcium/send_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func TestSend(t *testing.T) {
1919
c := NewTestCluster()
2020
ctx := context.Background()
2121

22+
// 这部分是在测试参数合法性
2223
// failed by validating
2324
_, err := c.Send(ctx, &types.SendOptions{IDs: []string{}, Files: []types.LinuxFile{{Content: []byte("xxx")}}})
2425
assert.Error(t, err)
@@ -57,21 +58,21 @@ func TestSend(t *testing.T) {
5758
// failed by engine
5859
content, _ := io.ReadAll(tmpfile)
5960
opts.Files[0].Content = content
60-
engine.On("VirtualizationCopyTo",
61+
engine.On("VirtualizationCopyChunkTo",
6162
mock.Anything, mock.Anything, mock.Anything,
6263
mock.Anything, mock.Anything, mock.Anything,
63-
mock.Anything,
64+
mock.Anything, mock.Anything,
6465
).Return(types.ErrMockError).Once()
6566
ch, err = c.Send(ctx, opts)
6667
assert.NoError(t, err)
6768
for r := range ch {
6869
assert.Error(t, r.Error)
6970
}
7071
// success
71-
engine.On("VirtualizationCopyTo",
72+
engine.On("VirtualizationCopyChunkTo",
7273
mock.Anything, mock.Anything, mock.Anything,
7374
mock.Anything, mock.Anything, mock.Anything,
74-
mock.Anything,
75+
mock.Anything, mock.Anything,
7576
).Return(nil)
7677
ch, err = c.Send(ctx, opts)
7778
assert.NoError(t, err)

cluster/calcium/sendlarge.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package calcium
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
8+
"github.com/pkg/errors"
9+
"github.com/projecteru2/core/log"
10+
"github.com/projecteru2/core/types"
11+
"github.com/projecteru2/core/utils"
12+
)
13+
14+
// SendLargeFile send large files by stream to workload
15+
func (c *Calcium) SendLargeFile(ctx context.Context, inputChan chan *types.SendLargeFileOptions) chan *types.SendMessage {
16+
resp := make(chan *types.SendMessage)
17+
wg := &sync.WaitGroup{}
18+
utils.SentryGo(func() {
19+
defer close(resp)
20+
senders := make(map[string]*workloadSender)
21+
// for each file
22+
for data := range inputChan {
23+
for _, id := range data.Ids {
24+
if _, ok := senders[id]; !ok {
25+
log.Debugf(ctx, "[SendLargeFile] create sender for %s", id)
26+
// for each container, let's create a new sender to send identical file chunk, each chunk will include the metadata of this file
27+
// we need to add `waitGroup out of the `newWorkloadSender` because we need to avoid `wg.Wait()` be executing before `wg.Add()`,
28+
// which will cause the goroutine in `c.newWorkloadSender` to be killed.
29+
wg.Add(1)
30+
sender := c.newWorkloadSender(ctx, id, resp, wg)
31+
senders[id] = sender
32+
}
33+
senders[id].send(data)
34+
}
35+
}
36+
for _, sender := range senders {
37+
sender.close()
38+
}
39+
wg.Wait()
40+
})
41+
return resp
42+
}
43+
44+
type workloadSender struct {
45+
calcium *Calcium
46+
id string
47+
wg *sync.WaitGroup
48+
buffer chan *types.SendLargeFileOptions
49+
resp chan *types.SendMessage
50+
}
51+
52+
func (c *Calcium) newWorkloadSender(ctx context.Context, ID string, resp chan *types.SendMessage, wg *sync.WaitGroup) *workloadSender {
53+
sender := &workloadSender{
54+
calcium: c,
55+
id: ID,
56+
wg: wg,
57+
buffer: make(chan *types.SendLargeFileOptions, 10),
58+
resp: resp,
59+
}
60+
utils.SentryGo(func() {
61+
var writer *io.PipeWriter
62+
curFile := ""
63+
for data := range sender.buffer {
64+
if curFile != "" && curFile != data.Dst {
65+
log.Warnf(ctx, "[newWorkloadExecutor] receive different files %s, %s", curFile, data.Dst)
66+
break
67+
}
68+
// ready to send
69+
if curFile == "" {
70+
log.Debugf(ctx, "[newWorkloadExecutor]Receive new file %s to %s", curFile, sender.id)
71+
curFile = data.Dst
72+
pr, pw := io.Pipe()
73+
writer = pw
74+
utils.SentryGo(func(ID, name string, size int64, content io.Reader, uid, gid int, mode int64) func() {
75+
return func() {
76+
defer wg.Done()
77+
if err := sender.calcium.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error {
78+
err := errors.WithStack(workload.Engine.VirtualizationCopyChunkTo(ctx, ID, name, size, content, uid, gid, mode))
79+
resp <- &types.SendMessage{ID: ID, Path: name, Error: err}
80+
return nil
81+
}); err != nil {
82+
resp <- &types.SendMessage{ID: ID, Error: err}
83+
}
84+
}
85+
}(ID, curFile, data.Size, pr, data.UID, data.GID, data.Mode))
86+
}
87+
n, err := writer.Write(data.Chunk)
88+
if err != nil || n != len(data.Chunk) {
89+
log.Errorf(ctx, err, "[newWorkloadExecutor] send file to engine err, file = %s", curFile)
90+
break
91+
}
92+
}
93+
writer.Close()
94+
})
95+
return sender
96+
}
97+
98+
func (s *workloadSender) send(chunk *types.SendLargeFileOptions) {
99+
s.buffer <- chunk
100+
}
101+
102+
func (s *workloadSender) close() {
103+
close(s.buffer)
104+
}

cluster/calcium/sendlarge_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package calcium
2+
3+
import (
4+
"context"
5+
"io/ioutil"
6+
"os"
7+
"testing"
8+
9+
enginemocks "github.com/projecteru2/core/engine/mocks"
10+
lockmocks "github.com/projecteru2/core/lock/mocks"
11+
storemocks "github.com/projecteru2/core/store/mocks"
12+
"github.com/projecteru2/core/types"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/mock"
15+
)
16+
17+
func TestSendLarge(t *testing.T) {
18+
c := NewTestCluster()
19+
ctx := context.Background()
20+
21+
tmpfile, err := os.CreateTemp("", "example")
22+
assert.NoError(t, err)
23+
defer os.RemoveAll(tmpfile.Name())
24+
defer tmpfile.Close()
25+
opts := &types.SendLargeFileOptions{
26+
Ids: []string{"cid"},
27+
Size: 1,
28+
Dst: "/tmp/1",
29+
Chunk: []byte{},
30+
}
31+
optsChan := make(chan *types.SendLargeFileOptions)
32+
store := &storemocks.Store{}
33+
c.store = store
34+
lock := &lockmocks.DistributedLock{}
35+
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
36+
lock.On("Unlock", mock.Anything).Return(nil)
37+
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
38+
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrMockError).Once()
39+
ch := c.SendLargeFile(ctx, optsChan)
40+
go func() {
41+
optsChan <- opts
42+
close(optsChan)
43+
}()
44+
for r := range ch {
45+
assert.Error(t, r.Error)
46+
}
47+
engine := &enginemocks.API{}
48+
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(
49+
[]*types.Workload{{ID: "cid", Engine: engine}}, nil,
50+
)
51+
// failed by engine
52+
content, _ := ioutil.ReadAll(tmpfile)
53+
opts.Chunk = content
54+
engine.On("VirtualizationCopyChunkTo",
55+
mock.Anything, mock.Anything, mock.Anything,
56+
mock.Anything, mock.Anything, mock.Anything,
57+
mock.Anything, mock.Anything,
58+
).Return(types.ErrMockError).Once()
59+
optsChan = make(chan *types.SendLargeFileOptions)
60+
ch = c.SendLargeFile(ctx, optsChan)
61+
go func() {
62+
optsChan <- opts
63+
close(optsChan)
64+
}()
65+
for r := range ch {
66+
t.Log(r.Error)
67+
assert.Error(t, r.Error)
68+
}
69+
// success
70+
engine.On("VirtualizationCopyChunkTo",
71+
mock.Anything, mock.Anything, mock.Anything,
72+
mock.Anything, mock.Anything, mock.Anything,
73+
mock.Anything, mock.Anything,
74+
).Return(nil)
75+
optsChan = make(chan *types.SendLargeFileOptions)
76+
ch = c.SendLargeFile(ctx, optsChan)
77+
go func() {
78+
optsChan <- opts
79+
close(optsChan)
80+
}()
81+
for r := range ch {
82+
assert.Equal(t, r.ID, "cid")
83+
assert.Equal(t, r.Path, "/tmp/1")
84+
assert.NoError(t, r.Error)
85+
}
86+
}

cluster/cluster.go

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type Cluster interface {
8080
// file methods
8181
Copy(ctx context.Context, opts *types.CopyOptions) (chan *types.CopyMessage, error)
8282
Send(ctx context.Context, opts *types.SendOptions) (chan *types.SendMessage, error)
83+
SendLargeFile(ctx context.Context, opts chan *types.SendLargeFileOptions) chan *types.SendMessage
8384
// image methods
8485
BuildImage(ctx context.Context, opts *types.BuildOptions) (chan *types.BuildImageMessage, error)
8586
CacheImage(ctx context.Context, opts *types.ImageOptions) (chan *types.CacheImageMessage, error)

0 commit comments

Comments
 (0)