Skip to content

Commit 7a3f6f5

Browse files
intro goroutine pool to speed up engine API call (#343)
* intro goroutine pool to speed up engine API call * make MaxConcurrency configureable * ensure transaction go well
1 parent cd5681d commit 7a3f6f5

File tree

7 files changed

+106
-31
lines changed

7 files changed

+106
-31
lines changed

cluster/calcium/calcium_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ func NewTestCluster() *Calcium {
4949
MaxShare: -1,
5050
ShareBase: 100,
5151
},
52-
WALFile: filepath.Join(walDir, "core.wal.log"),
52+
WALFile: filepath.Join(walDir, "core.wal.log"),
53+
MaxConcurrency: 10,
5354
}
5455
c.store = &storemocks.Store{}
5556
c.scheduler = &schedulermocks.Scheduler{}

cluster/calcium/create.go

+25-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/projecteru2/core/types"
1717
"github.com/projecteru2/core/utils"
1818
"github.com/sanity-io/litter"
19+
"golang.org/x/sync/semaphore"
1920
)
2021

2122
// CreateWorkload use options to create workloads
@@ -142,6 +143,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo
142143
}
143144

144145
wg.Wait()
146+
log.Debugf("[Calcium.doDeployWorkloads] rollbackMap: %+v", rollbackMap)
145147
return rollbackMap, errors.WithStack(err)
146148
}
147149

@@ -155,21 +157,34 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
155157
return utils.Range(deploy), errors.WithStack(err)
156158
}
157159

160+
sem, appendLock := semaphore.NewWeighted(c.config.MaxConcurrency), sync.Mutex{}
158161
for idx := 0; idx < deploy; idx++ {
159162
createMsg := &types.CreateWorkloadMessage{
160163
Podname: opts.Podname,
161164
Nodename: nodename,
162165
Publish: map[string][]string{},
163166
}
164167

165-
do := func(idx int) (e error) {
168+
if e := sem.Acquire(ctx, 1); e != nil {
169+
log.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to acquire semaphore: %+v", e)
170+
err = e
171+
ch <- &types.CreateWorkloadMessage{Error: e}
172+
appendLock.Lock()
173+
indices = append(indices, idx)
174+
appendLock.Unlock()
175+
continue
176+
}
177+
go func(idx int) (e error) {
166178
defer func() {
167179
if e != nil {
168180
err = e
169181
createMsg.Error = e
182+
appendLock.Lock()
170183
indices = append(indices, idx)
184+
appendLock.Unlock()
171185
}
172186
ch <- createMsg
187+
sem.Release(1)
173188
}()
174189

175190
r := &types.ResourceMeta{}
@@ -186,10 +201,17 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
186201
createMsg.ResourceMeta = *r
187202
createOpts := c.doMakeWorkloadOptions(seq+idx, createMsg, opts, node)
188203
return c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, deploy-1-idx)
189-
}
190-
_ = do(idx)
204+
}(idx) // nolint:errcheck
191205
}
192206

207+
// sem.Acquire(ctx, MaxConcurrency) 等价于 WaitGroup.Wait()
208+
// 用 context.Background() 是为了防止语义被破坏: 一定要等到所有 goroutine 完毕, 不能被用户 ctx 打断
209+
// 否则可能会出现的情况是, 某些 goroutine 还没结束与运行 defer, 这个函数就 return 并 close channel, 导致 defer 里给 closed channel 发消息 panic
210+
if e := sem.Acquire(context.Background(), c.config.MaxConcurrency); e != nil {
211+
log.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to wait all workers done: %+v", e)
212+
err = e
213+
indices = utils.Range(deploy)
214+
}
193215
return indices, errors.WithStack(err)
194216
}
195217

cluster/calcium/create_test.go

+62-15
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,64 @@ func TestCreateWorkloadTxn(t *testing.T) {
9595
},
9696
}
9797

98-
store := c.store.(*storemocks.Store)
98+
store := &storemocks.Store{}
99+
sche := &schedulermocks.Scheduler{}
100+
scheduler.InitSchedulerV1(sche)
101+
c.store = store
102+
c.scheduler = sche
103+
engine := &enginemocks.API{}
104+
105+
node1 := &types.Node{
106+
NodeMeta: types.NodeMeta{
107+
Name: "n1",
108+
},
109+
Engine: engine,
110+
}
111+
node2 := &types.Node{
112+
NodeMeta: types.NodeMeta{
113+
Name: "n2",
114+
},
115+
Engine: engine,
116+
}
117+
nodes = []*types.Node{node1, node2}
118+
119+
store.On("SaveProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
120+
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
121+
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
122+
123+
// doAllocResource fails: MakeDeployStatus
124+
lock := &lockmocks.DistributedLock{}
125+
lock.On("Lock", mock.Anything).Return(context.Background(), nil)
126+
lock.On("Unlock", mock.Anything).Return(nil)
127+
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
128+
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil)
129+
store.On("GetNode",
130+
mock.AnythingOfType("*context.emptyCtx"),
131+
mock.AnythingOfType("string"),
132+
).Return(
133+
func(_ context.Context, name string) (node *types.Node) {
134+
node = node1
135+
if name == "n2" {
136+
node = node2
137+
}
138+
return
139+
}, nil)
140+
sche.On("SelectStorageNodes", mock.AnythingOfType("[]resourcetypes.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
141+
return scheduleInfos
142+
}, len(nodes), nil)
143+
sche.On("SelectStorageNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("int64")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ int64) []resourcetypes.ScheduleInfo {
144+
return scheduleInfos
145+
}, len(nodes), nil)
146+
sche.On("SelectVolumeNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("types.VolumeBindings")).Return(func(scheduleInfos []resourcetypes.ScheduleInfo, _ types.VolumeBindings) []resourcetypes.ScheduleInfo {
147+
return scheduleInfos
148+
}, nil, len(nodes), nil)
149+
sche.On("SelectMemoryNodes", mock.AnythingOfType("[]types.ScheduleInfo"), mock.AnythingOfType("float64"), mock.AnythingOfType("int64")).Return(
150+
func(scheduleInfos []resourcetypes.ScheduleInfo, _ float64, _ int64) []resourcetypes.ScheduleInfo {
151+
for i := range scheduleInfos {
152+
scheduleInfos[i].Capacity = 1
153+
}
154+
return scheduleInfos
155+
}, len(nodes), nil)
99156
store.On("MakeDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(
100157
errors.Wrap(context.DeadlineExceeded, "MakeDeployStatus"),
101158
).Once()
@@ -133,7 +190,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
133190
assert.Error(t, m.Error, "UpdateNodes1")
134191
}
135192
assert.EqualValues(t, 1, cnt)
136-
node1, node2 := nodes[0], nodes[1]
193+
node1, node2 = nodes[0], nodes[1]
137194
assert.EqualValues(t, 1, node1.CPUUsed)
138195
assert.EqualValues(t, 1, node2.CPUUsed)
139196
node1.CPUUsed = 0
@@ -153,18 +210,7 @@ func TestCreateWorkloadTxn(t *testing.T) {
153210
}
154211
return
155212
}, nil)
156-
store.On("GetNode",
157-
mock.AnythingOfType("*context.cancelCtx"),
158-
mock.AnythingOfType("string"),
159-
).Return(
160-
func(_ context.Context, name string) (node *types.Node) {
161-
node = node1
162-
if name == "n2" {
163-
node = node2
164-
}
165-
return
166-
}, nil)
167-
engine := node1.Engine.(*enginemocks.API)
213+
engine = node1.Engine.(*enginemocks.API)
168214
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
169215
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
170216
store.On("UpdateProcessing", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
@@ -241,7 +287,8 @@ func TestCreateWorkloadTxn(t *testing.T) {
241287
assert.EqualValues(t, 2, cnt)
242288
assert.EqualValues(t, 1, errCnt)
243289
assert.EqualValues(t, 1, node1.CPUUsed+node2.CPUUsed)
244-
return
290+
store.AssertExpectations(t)
291+
engine.AssertExpectations(t)
245292
}
246293

247294
func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {

core.yaml.sample

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ global_timeout: 300s
66
lock_timeout: 30s
77
cert_path: "/etc/eru/tls"
88
sentry_dsn: "https://examplePublicKey@o0.ingest.sentry.io/0"
9+
max_concurrency: 20
910

1011
auth:
1112
username: admin

engine/docker/container.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ func (e *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.Vir
6868
// 没有 networks 的时候用 networkmode 的值
6969
// 有 networks 的时候一律用用 networks 的值作为 mode
7070
var networkMode dockercontainer.NetworkMode
71-
for name := range opts.Networks {
71+
networks := map[string]string{}
72+
for name, network := range opts.Networks {
7273
networkMode = dockercontainer.NetworkMode(name)
74+
networks[name] = network
7375
if networkMode.IsHost() {
74-
opts.Networks[name] = ""
76+
networks[name] = ""
7577
}
7678
}
7779
// 如果没有 network 用默认值替换
@@ -198,7 +200,7 @@ func (e *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.Vir
198200
networkConfig := &dockernetwork.NetworkingConfig{
199201
EndpointsConfig: map[string]*dockernetwork.EndpointSettings{},
200202
}
201-
for networkID, ipv4 := range opts.Networks {
203+
for networkID, ipv4 := range networks {
202204
endpointSetting, err := e.makeIPV4EndpointSetting(ipv4)
203205
if err != nil {
204206
return r, err

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ require (
4242
go.uber.org/automaxprocs v1.3.0
4343
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de
4444
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc
45+
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
4546
google.golang.org/grpc v1.29.1
4647
google.golang.org/protobuf v1.23.0
4748
gopkg.in/alexcesaro/statsd.v2 v2.0.0 // indirect

types/config.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ import (
66

77
// Config holds eru-core config
88
type Config struct {
9-
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
10-
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
11-
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
12-
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
13-
Statsd string `yaml:"statsd"` // statsd host and port
14-
Profile string `yaml:"profile"` // profile ip:port
15-
CertPath string `yaml:"cert_path"` // docker cert files path
16-
Auth AuthConfig `yaml:"auth"` // grpc auth
17-
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
9+
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
10+
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
11+
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
12+
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
13+
Statsd string `yaml:"statsd"` // statsd host and port
14+
Profile string `yaml:"profile"` // profile ip:port
15+
CertPath string `yaml:"cert_path"` // docker cert files path
16+
MaxConcurrency int64 `yaml:"max_concurrency" default:"20"` // concurrently call single runtime in the same time
17+
Auth AuthConfig `yaml:"auth"` // grpc auth
18+
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
1819

1920
WALFile string `yaml:"wal_file" required:"true" default:"core.wal"` // WAL file path
2021
WALOpenTimeout time.Duration `yaml:"wal_open_timeout" required:"true" default:"8s"` // timeout for opening a WAL file

0 commit comments

Comments
 (0)