Skip to content

Commit 108be32

Browse files
jschwinger233CMGS
authored andcommitted
intro LinuxFile type for Copy
1 parent 1cc1e89 commit 108be32

File tree

11 files changed

+121
-146
lines changed

11 files changed

+121
-146
lines changed

cluster/calcium/copy.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,30 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
3333

3434
workload, err := c.GetWorkload(ctx, id)
3535
if err != nil {
36-
ch <- makeCopyMessage(id, "", "", logger.Err(ctx, err), nil)
36+
for _, path := range paths {
37+
ch <- &types.CopyMessage{
38+
ID: id,
39+
Path: path,
40+
Error: logger.Err(ctx, err),
41+
}
42+
}
3743
return
3844
}
3945

4046
for _, path := range paths {
41-
resp, name, err := workload.Engine.VirtualizationCopyFrom(ctx, workload.ID, path)
42-
ch <- makeCopyMessage(id, name, path, err, resp)
47+
content, uid, gid, mode, err := workload.Engine.VirtualizationCopyFrom(ctx, workload.ID, path)
48+
ch <- &types.CopyMessage{
49+
ID: id,
50+
Path: path,
51+
Error: err,
52+
LinuxFile: types.LinuxFile{
53+
Filename: path,
54+
Content: content,
55+
UID: uid,
56+
GID: gid,
57+
Mode: mode,
58+
},
59+
}
4360
}
4461
}
4562
}(id, paths))

cluster/calcium/create.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,9 @@ func (c *Calcium) doDeployOneWorkload(
300300

301301
func(ctx context.Context) (err error) {
302302
// Copy data to workload
303-
if len(opts.Data) > 0 {
304-
for dst, readerManager := range opts.Data {
305-
reader, err := readerManager.GetReader()
306-
if err != nil {
307-
return errors.WithStack(err)
308-
}
309-
if err = c.doSendFileToWorkload(ctx, node.Engine, workload.ID, dst, reader, true, false); err != nil {
303+
if len(opts.Files) > 0 {
304+
for _, file := range opts.Files {
305+
if err = c.doSendFileToWorkload(ctx, node.Engine, workload.ID, file); err != nil {
310306
return err
311307
}
312308
}

cluster/calcium/helper.go

-10
Original file line numberDiff line numberDiff line change
@@ -106,16 +106,6 @@ func pullImage(ctx context.Context, node *types.Node, image string) error {
106106
return nil
107107
}
108108

109-
func makeCopyMessage(id, name, path string, err error, data io.ReadCloser) *types.CopyMessage {
110-
return &types.CopyMessage{
111-
ID: id,
112-
Name: name,
113-
Path: path,
114-
Error: err,
115-
Data: data,
116-
}
117-
}
118-
119109
func processVirtualizationInStream(
120110
ctx context.Context,
121111
inStream io.WriteCloser,

cluster/calcium/replace.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,17 @@ func (c *Calcium) doReplaceWorkload(
133133
}
134134
// 获得文件 io
135135
for src, dst := range opts.Copy {
136-
stream, _, err := workload.Engine.VirtualizationCopyFrom(ctx, workload.ID, src)
136+
content, uid, gid, mode, err := workload.Engine.VirtualizationCopyFrom(ctx, workload.ID, src)
137137
if err != nil {
138138
return nil, removeMessage, errors.WithStack(err)
139139
}
140-
if opts.DeployOptions.Data[dst], err = types.NewReaderManager(stream); err != nil {
141-
return nil, removeMessage, errors.WithStack(err)
142-
}
140+
opts.DeployOptions.Files = append(opts.DeployOptions.Files, types.LinuxFile{
141+
Filename: dst,
142+
Content: content,
143+
UID: uid,
144+
GID: gid,
145+
Mode: mode,
146+
})
143147
}
144148

145149
createMessage := &types.CreateWorkloadMessage{

cluster/calcium/send.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package calcium
22

33
import (
4-
"bytes"
54
"context"
6-
"io"
75
"sync"
86

97
"github.com/projecteru2/core/engine"
@@ -33,9 +31,9 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
3331

3432
defer wg.Done()
3533
if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error {
36-
for dst, content := range opts.Data {
37-
err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true)
38-
ch <- &types.SendMessage{ID: id, Path: dst, Error: logger.Err(ctx, err)}
34+
for _, file := range opts.Files {
35+
err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, file)
36+
ch <- &types.SendMessage{ID: id, Path: file.Filename, Error: logger.Err(ctx, err)}
3937
}
4038
return nil
4139
}); err != nil {
@@ -50,8 +48,7 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
5048
return ch, nil
5149
}
5250

53-
func (c *Calcium) doSendFileToWorkload(ctx context.Context, engine engine.API, ID, dst string, content io.Reader, AllowOverwriteDirWithFile bool, CopyUIDGID bool) error {
54-
log.Infof(ctx, "[doSendFileToWorkload] Send file to %s:%s", ID, dst)
55-
log.Debugf(ctx, "[doSendFileToWorkload] remote path %s", dst)
56-
return errors.WithStack(engine.VirtualizationCopyTo(ctx, ID, dst, content, AllowOverwriteDirWithFile, CopyUIDGID))
51+
func (c *Calcium) doSendFileToWorkload(ctx context.Context, engine engine.API, ID string, file types.LinuxFile) error {
52+
log.Infof(ctx, "[doSendFileToWorkload] Send file to %s:%s", ID)
53+
return errors.WithStack(engine.VirtualizationCopyTo(ctx, ID, file.Filename, file.Clone().Content, file.UID, file.GID, file.Mode))
5754
}

rpc/rpc.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"bufio"
66
"fmt"
77
"io"
8-
"io/ioutil"
8+
"path/filepath"
99
"runtime"
1010
"sync"
1111
"time"
@@ -447,7 +447,10 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err
447447
// 4K buffer
448448
p := make([]byte, 4096)
449449
for m := range ch {
450-
msg := &pb.CopyMessage{Id: m.ID, Name: m.Name, Path: m.Path}
450+
msg := &pb.CopyMessage{
451+
Id: m.ID,
452+
Path: m.Path,
453+
}
451454
if m.Error != nil {
452455
msg.Error = m.Error.Error()
453456
if err := stream.Send(msg); err != nil {
@@ -463,21 +466,21 @@ func (v *Vibranium) Copy(opts *pb.CopyOptions, stream pb.CoreRPC_CopyServer) err
463466
defer func() {
464467
w.CloseWithError(err) // nolint
465468
}()
466-
defer m.Data.Close()
467469

468-
var bs []byte
469470
tw := tar.NewWriter(w)
470471
defer tw.Close()
471-
if bs, err = ioutil.ReadAll(m.Data); err != nil {
472-
log.Errorf(ctx, "[Copy] Error during extracting copy data: %v", err)
473-
return
472+
header := &tar.Header{
473+
Name: filepath.Base(m.Filename),
474+
Uid: int(m.UID),
475+
Gid: int(m.GID),
476+
Mode: m.Mode,
477+
Size: int64(len(m.Content)),
474478
}
475-
header := &tar.Header{Name: m.Name, Mode: 0644, Size: int64(len(bs))}
476479
if err = tw.WriteHeader(header); err != nil {
477480
log.Errorf(ctx, "[Copy] Error during writing tarball header: %v", err)
478481
return
479482
}
480-
if _, err = tw.Write(bs); err != nil {
483+
if _, err = tw.Write(m.Content); err != nil {
481484
log.Errorf(ctx, "[Copy] Error during writing tarball content: %v", err)
482485
return
483486
}
@@ -517,7 +520,6 @@ func (v *Vibranium) Send(opts *pb.SendOptions, stream pb.CoreRPC_SendServer) err
517520
return grpcstatus.Error(Send, err.Error())
518521
}
519522

520-
sendOpts.Data = opts.Data
521523
ch, err := v.cluster.Send(ctx, sendOpts)
522524
if err != nil {
523525
return grpcstatus.Error(Send, err.Error())

rpc/transform.go

+20-13
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,20 @@ func toCoreCopyOptions(b *pb.CopyOptions) *types.CopyOptions {
119119
}
120120

121121
func toCoreSendOptions(b *pb.SendOptions) (*types.SendOptions, error) { // nolint
122-
return &types.SendOptions{IDs: b.Ids}, nil
122+
files := []types.LinuxFile{}
123+
for filename, content := range b.Data {
124+
files = append(files, types.LinuxFile{
125+
Content: content,
126+
Filename: filename,
127+
UID: int(b.Owners[filename].GetUid()),
128+
GID: int(b.Owners[filename].GetGid()),
129+
Mode: b.Modes[filename].GetMode(),
130+
})
131+
}
132+
return &types.SendOptions{
133+
IDs: b.Ids,
134+
Files: files,
135+
}, nil
123136
}
124137

125138
func toCoreAddNodeOptions(b *pb.AddNodeOptions) *types.AddNodeOptions {
@@ -277,21 +290,15 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) {
277290

278291
var err error
279292
files := []types.LinuxFile{}
280-
for filename, bs := d.Data{
293+
for filename, bs := range d.Data {
281294
files = append(files, types.LinuxFile{
282-
Content: bs,
295+
Content: bs,
283296
Filename: filename,
284-
OwnerID: int(d.Owners[filename]),
285-
GroupID: int(d.Groups[filename]),
286-
Mode: d.Mode,
297+
UID: int(d.Owners[filename].GetUid()),
298+
GID: int(d.Owners[filename].GetGid()),
299+
Mode: d.Modes[filename].GetMode(),
287300
})
288301
}
289-
data := map[string]types.ReaderManager{}
290-
for filename, bs := range d.Data {
291-
if data[filename], err = types.NewReaderManager(bytes.NewBuffer(bs)); err != nil {
292-
return nil, err
293-
}
294-
}
295302

296303
vbsLimit, err := types.NewVolumeBindings(d.ResourceOpts.VolumesLimit)
297304
if err != nil {
@@ -346,7 +353,7 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) {
346353
IgnoreHook: d.IgnoreHook,
347354
AfterCreate: d.AfterCreate,
348355
RawArgs: d.RawArgs,
349-
Data: data,
356+
Files: files,
350357
}, nil
351358
}
352359

types/message.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package types
22

33
import (
44
"bytes"
5-
"io"
65
)
76

87
// RemoveWorkloadMessage for remove message
@@ -30,11 +29,10 @@ type BuildImageMessage struct {
3029

3130
// CopyMessage for copy message
3231
type CopyMessage struct {
33-
ID string `json:"id,omitempty"`
34-
Name string `json:"name,omitempty"`
35-
Path string `json:"path,omitempty"`
36-
Error error `json:"error,omitempty"`
37-
Data io.ReadCloser `json:"-"`
32+
ID string `json:"id,omitempty"`
33+
Path string `json:"path,omitempty"`
34+
Error error `json:"error,omitempty"`
35+
LinuxFile `json:"-"`
3836
}
3937

4038
// SendMessage for send message

types/options.go

+48-26
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,29 @@ import "github.com/pkg/errors"
77
// DeployOptions is options for deploying
88
type DeployOptions struct {
99
ResourceOpts ResourceOptions
10-
Name string // Name of application
11-
Entrypoint *Entrypoint // entrypoint
12-
Podname string // Name of pod to deploy
13-
NodeFilter NodeFilter // filter of nodenames, using includes or not using excludes
14-
Image string // Name of image to deploy
15-
ExtraArgs string // Extra arguments to append to command
16-
Count int // How many workloads needed, e.g. 4
17-
Env []string // Env for workload
18-
DNS []string // DNS for workload
19-
ExtraHosts []string // Extra hosts for workload
20-
Networks map[string]string // Network names and specified IPs
21-
User string // User for workload
22-
Debug bool // debug mode, use syslog as log driver
23-
OpenStdin bool // OpenStdin for workload
24-
Labels map[string]string // Labels for workloads
25-
DeployStrategy string // Deploy strategy
26-
Data map[string]ReaderManager // For additional file data
27-
NodesLimit int // Limit nodes count
28-
ProcessIdent string // ProcessIdent ident this deploy
29-
IgnoreHook bool // IgnoreHook ignore hook process
30-
AfterCreate []string // AfterCreate support run cmds after create
31-
RawArgs []byte // RawArgs for raw args processing
32-
Lambda bool // indicate is lambda workload or not
10+
Name string // Name of application
11+
Entrypoint *Entrypoint // entrypoint
12+
Podname string // Name of pod to deploy
13+
NodeFilter NodeFilter // filter of nodenames, using includes or not using excludes
14+
Image string // Name of image to deploy
15+
ExtraArgs string // Extra arguments to append to command
16+
Count int // How many workloads needed, e.g. 4
17+
Env []string // Env for workload
18+
DNS []string // DNS for workload
19+
ExtraHosts []string // Extra hosts for workload
20+
Networks map[string]string // Network names and specified IPs
21+
User string // User for workload
22+
Debug bool // debug mode, use syslog as log driver
23+
OpenStdin bool // OpenStdin for workload
24+
Labels map[string]string // Labels for workloads
25+
DeployStrategy string // Deploy strategy
26+
Files []LinuxFile // For additional file data
27+
NodesLimit int // Limit nodes count
28+
ProcessIdent string // ProcessIdent ident this deploy
29+
IgnoreHook bool // IgnoreHook ignore hook process
30+
AfterCreate []string // AfterCreate support run cmds after create
31+
RawArgs []byte // RawArgs for raw args processing
32+
Lambda bool // indicate is lambda workload or not
3333
}
3434

3535
// Processing tracks workloads count yet finished
@@ -80,18 +80,40 @@ func (o *CopyOptions) Validate() error {
8080
return nil
8181
}
8282

83+
// LinuxFile is used for copy file
84+
type LinuxFile struct {
85+
Content []byte
86+
Filename string
87+
UID int
88+
GID int
89+
Mode int64
90+
}
91+
92+
// Clone returns a copy of content bytes
93+
func (f LinuxFile) Clone() LinuxFile {
94+
c := make([]byte, len(f.Content))
95+
copy(c, f.Content)
96+
return LinuxFile{
97+
Content: c,
98+
Filename: f.Filename,
99+
UID: f.UID,
100+
GID: f.GID,
101+
Mode: f.Mode,
102+
}
103+
}
104+
83105
// SendOptions for send files to multiple workload
84106
type SendOptions struct {
85-
IDs []string
86-
Data map[string][]byte
107+
IDs []string
108+
Files []LinuxFile
87109
}
88110

89111
// Validate checks options
90112
func (o *SendOptions) Validate() error {
91113
if len(o.IDs) == 0 {
92114
return errors.WithStack(ErrNoWorkloadIDs)
93115
}
94-
if len(o.Data) == 0 {
116+
if len(o.Files) == 0 {
95117
return errors.WithStack(ErrNoFilesToSend)
96118
}
97119
return nil

0 commit comments

Comments
 (0)