Skip to content

Commit c28c175

Browse files
committed
introduce ReaderManager to address race condition in smaller critical
1 parent 94b8f94 commit c28c175

File tree

4 files changed

+39
-17
lines changed

4 files changed

+39
-17
lines changed

cluster/calcium/create.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package calcium
33
import (
44
"context"
55
"fmt"
6-
"io"
76
"sync"
87

98
"github.com/projecteru2/core/cluster"
@@ -224,16 +223,12 @@ func (c *Calcium) doCreateAndStartContainer(
224223

225224
// Copy data to container
226225
if len(opts.Data) > 0 {
227-
for dst, src := range opts.Data {
228-
mutexSend := func() error {
229-
opts.Mux.Lock()
230-
defer opts.Mux.Unlock()
231-
if _, err = src.Seek(0, io.SeekStart); err != nil {
232-
return err
233-
}
234-
return c.doSendFileToContainer(ctx, node.Engine, container.ID, dst, src, true, true)
226+
for dst, readerManager := range opts.Data {
227+
reader, err := readerManager.GetReader()
228+
if err != nil {
229+
return err
235230
}
236-
if err = mutexSend(); err != nil {
231+
if err = c.doSendFileToContainer(ctx, node.Engine, container.ID, dst, reader, true, true); err != nil {
237232
return err
238233
}
239234
}

cluster/calcium/replace.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"errors"
77
"fmt"
8-
"io/ioutil"
98
"sync"
109

1110
"github.com/projecteru2/core/store"
@@ -121,11 +120,9 @@ func (c *Calcium) doReplaceContainer(
121120
if err != nil {
122121
return nil, removeMessage, err
123122
}
124-
bs, err := ioutil.ReadAll(stream)
125-
if err != nil {
123+
if opts.DeployOptions.Data[dst], err = types.NewReaderManager(stream); err != nil {
126124
return nil, removeMessage, err
127125
}
128-
opts.DeployOptions.Data[dst] = bytes.NewReader(bs)
129126
}
130127

131128
createMessage := &types.CreateContainerMessage{}

rpc/transform.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,11 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) {
262262
return nil, err
263263
}
264264

265-
data := map[string]*bytes.Reader{}
265+
data := map[string]types.ReaderManager{}
266266
for filename, bs := range d.Data {
267-
data[filename] = bytes.NewReader(bs)
267+
if data[filename], err = types.NewReaderManager(bytes.NewBuffer(bs)); err != nil {
268+
return nil, err
269+
}
268270
}
269271

270272
return &types.DeployOptions{

types/options.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package types
22

33
import (
44
"bytes"
5+
"io"
6+
"io/ioutil"
57
"sync"
68
)
79

@@ -30,7 +32,7 @@ type DeployOptions struct {
3032
Labels map[string]string // Labels for containers
3133
NodeLabels map[string]string // NodeLabels for filter node
3234
DeployMethod string // Deploy method
33-
Data map[string]*bytes.Reader // For additional file data
35+
Data map[string]ReaderManager // For additional file data
3436
SoftLimit bool // Soft limit memory
3537
NodesLimit int // Limit nodes count
3638
ProcessIdent string // ProcessIdent ident this deploy
@@ -42,6 +44,32 @@ type DeployOptions struct {
4244
Mux sync.Mutex // used for concurrent send during creation
4345
}
4446

47+
// ReaderManager return Reader under concurrency
48+
type ReaderManager interface {
49+
GetReader() (io.Reader, error)
50+
}
51+
52+
type readerManager struct {
53+
mux sync.Mutex
54+
r io.ReadSeeker
55+
}
56+
57+
func (rm *readerManager) GetReader() (_ io.Reader, err error) {
58+
rm.mux.Lock()
59+
defer rm.mux.Unlock()
60+
buf := &bytes.Buffer{}
61+
_, err = io.Copy(buf, rm.r)
62+
_, err = rm.r.Seek(0, io.SeekStart)
63+
return buf, nil
64+
}
65+
66+
func NewReaderManager(r io.Reader) (ReaderManager, error) {
67+
bs, err := ioutil.ReadAll(r)
68+
return &readerManager{
69+
r: bytes.NewReader(bs),
70+
}, err
71+
}
72+
4573
// Normalize keeps deploy options consistant
4674
func (o *DeployOptions) Normalize() {
4775
o.Storage += o.Volumes.TotalSize()

0 commit comments

Comments
 (0)