Skip to content

Commit ce50438

Browse files
committed
refactor stage 2
1 parent 4fe8207 commit ce50438

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+2159
-2205
lines changed

Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ cloc:
4848

4949
unit-test:
5050
go vet `go list ./... | grep -v '/vendor/' | grep -v '/tools'` && \
51-
go test -race -timeout 240s -count=1 -vet=off -cover ./utils/... \
51+
go test -race -timeout 600s -count=1 -vet=off -cover ./utils/... \
5252
./types/... \
5353
./store/etcdv3/. \
5454
./store/etcdv3/embedded/. \
@@ -67,7 +67,7 @@ unit-test:
6767
./wal/kv/. \
6868
./store/redis/... \
6969
./lock/redis/... && \
70-
go test -timeout 240s -count=1 -cover ./cluster/calcium/...
70+
go test -timeout 600s -count=1 -cover ./cluster/calcium/...
7171

7272
lint:
7373
golangci-lint run

client/clientpool.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,12 @@ func checkAlive(ctx context.Context, rpc *clientWithStatus, timeout time.Duratio
106106

107107
func (c *Pool) updateClientsStatus(ctx context.Context, timeout time.Duration) {
108108
wg := &sync.WaitGroup{}
109+
defer wg.Wait()
109110
for _, rpc := range c.rpcClients {
110111
wg.Add(1)
111112
go func(r *clientWithStatus) {
112113
defer wg.Done()
113114
r.alive = checkAlive(ctx, r, timeout)
114115
}(rpc)
115116
}
116-
wg.Wait()
117117
}

cluster/calcium/build.go

+14-9
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,12 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
6161
return nil, errors.WithStack(types.ErrNoBuildPod)
6262
}
6363

64-
// get node by scheduler
65-
ch, err := c.ListPodNodes(ctx, &types.ListNodesOptions{Podname: c.config.Docker.BuildPod})
64+
// get nodes
65+
nodes, err := c.store.GetNodesByPod(ctx, c.config.Docker.BuildPod, nil, false)
6666
if err != nil {
6767
return nil, err
6868
}
6969

70-
nodes := []*types.Node{}
71-
for n := range ch {
72-
nodes = append(nodes, n)
73-
}
7470
if len(nodes) == 0 {
7571
return nil, errors.WithStack(types.ErrInsufficientNodes)
7672
}
@@ -79,14 +75,14 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
7975
}
8076

8177
func (c *Calcium) getMostIdleNode(ctx context.Context, nodes []*types.Node) (*types.Node, error) {
82-
nodeNames := []string{}
78+
nodenames := []string{}
8379
nodeMap := map[string]*types.Node{}
8480
for _, node := range nodes {
85-
nodeNames = append(nodeNames, node.Name)
81+
nodenames = append(nodenames, node.Name)
8682
nodeMap[node.Name] = node
8783
}
8884

89-
mostIdleNode, err := c.rmgr.GetMostIdleNode(ctx, nodeNames)
85+
mostIdleNode, err := c.rmgr.GetMostIdleNode(ctx, nodenames)
9086
if err != nil {
9187
return nil, err
9288
}
@@ -194,6 +190,15 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod
194190

195191
}
196192

193+
func (c *Calcium) getWorkloadNode(ctx context.Context, id string) (*types.Node, error) {
194+
w, err := c.store.GetWorkload(ctx, id)
195+
if err != nil {
196+
return nil, err
197+
}
198+
node, err := c.store.GetNode(ctx, w.Nodename)
199+
return node, err
200+
}
201+
197202
func withImageBuiltChannel(f func(chan *types.BuildImageMessage)) chan *types.BuildImageMessage {
198203
ch := make(chan *types.BuildImageMessage)
199204
utils.SentryGo(func() {

cluster/calcium/calcium.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
106106
return nil, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint
107107
}
108108

109-
go cal.InitMetrics()
109+
go cal.InitMetrics(ctx)
110110

111111
return cal, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint
112112
}

cluster/calcium/calcium_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func NewTestCluster() *Calcium {
4141
ServiceDiscoveryPushInterval: 15 * time.Second,
4242
},
4343
WALFile: filepath.Join(walDir, "core.wal.log"),
44-
MaxConcurrency: 10,
44+
MaxConcurrency: 100000,
4545
HAKeepaliveInterval: 16 * time.Second,
4646
}
4747
c.store = &storemocks.Store{}

cluster/calcium/capacity.go

+18-19
Original file line numberDiff line numberDiff line change
@@ -8,48 +8,47 @@ import (
88
"github.com/projecteru2/core/strategy"
99
"github.com/projecteru2/core/types"
1010
"github.com/sanity-io/litter"
11+
"golang.org/x/exp/maps"
1112

1213
"github.com/pkg/errors"
1314
)
1415

1516
// CalculateCapacity calculates capacity
1617
func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptions) (*types.CapacityMessage, error) {
1718
logger := log.WithField("Calcium", "CalculateCapacity").WithField("opts", opts)
19+
logger.Infof(ctx, "[CalculateCapacity] Calculate capacity with options:\n%s", litter.Options{Compact: true}.Sdump(opts))
1820
var err error
19-
log.Infof(ctx, "[CalculateCapacity] Calculate capacity with options:\n%s", litter.Options{Compact: true}.Sdump(opts))
2021
msg := &types.CapacityMessage{
2122
Total: 0,
2223
NodeCapacities: map[string]int{},
2324
}
2425

2526
return msg, c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
26-
nodes := []string{}
27-
for node := range nodeMap {
28-
nodes = append(nodes, node)
29-
}
27+
nodenames := maps.Keys(nodeMap)
3028

3129
if opts.DeployStrategy != strategy.Dummy {
32-
if msg.NodeCapacities, err = c.doGetDeployMap(ctx, nodes, opts); err != nil {
30+
if msg.NodeCapacities, err = c.doGetDeployStrategy(ctx, nodenames, opts); err != nil {
3331
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doGetDeployMap failed: %+v", err)
3432
return err
3533
}
3634

3735
for _, capacity := range msg.NodeCapacities {
3836
msg.Total += capacity
3937
}
40-
} else {
41-
var infos map[string]*resources.NodeCapacityInfo
42-
infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts)
43-
if err != nil {
44-
logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
45-
return err
46-
}
47-
if msg.Total <= 0 {
48-
return errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
49-
}
50-
for node, info := range infos {
51-
msg.NodeCapacities[node] = info.Capacity
52-
}
38+
return nil
39+
}
40+
41+
var infos map[string]*resources.NodeCapacityInfo
42+
infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodenames, opts.ResourceOpts)
43+
if err != nil {
44+
logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
45+
return err
46+
}
47+
if msg.Total <= 0 {
48+
return errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
49+
}
50+
for node, info := range infos {
51+
msg.NodeCapacities[node] = info.Capacity
5352
}
5453
return nil
5554
})

cluster/calcium/capacity_test.go

+47-28
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package calcium
22

33
import (
44
"context"
5-
"errors"
65
"testing"
76

87
enginemocks "github.com/projecteru2/core/engine/mocks"
@@ -21,66 +20,86 @@ func TestCalculateCapacity(t *testing.T) {
2120
c := NewTestCluster()
2221
ctx := context.Background()
2322
store := c.store.(*storemocks.Store)
24-
rmgr := c.rmgr.(*resourcemocks.Manager)
25-
rmgr.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, nil)
26-
engine := &enginemocks.API{}
2723

28-
// pod1 := &types.Pod{Name: "p1"}
24+
lock := &lockmocks.DistributedLock{}
25+
lock.On("Lock", mock.Anything).Return(ctx, nil)
26+
lock.On("Unlock", mock.Anything).Return(nil)
27+
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
28+
29+
engine := &enginemocks.API{}
30+
name := "n1"
2931
node1 := &types.Node{
3032
NodeMeta: types.NodeMeta{
31-
Name: "n1",
33+
Name: name,
3234
},
3335
Engine: engine,
3436
}
3537
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
36-
lock := &lockmocks.DistributedLock{}
37-
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
38-
lock.On("Unlock", mock.Anything).Return(nil)
39-
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
40-
// failed by call plugin
38+
4139
opts := &types.DeployOptions{
4240
Entrypoint: &types.Entrypoint{
4341
Name: "entry",
4442
},
4543
ResourceOpts: types.WorkloadResourceOpts{},
4644
DeployStrategy: strategy.Auto,
4745
NodeFilter: types.NodeFilter{
48-
Includes: []string{"n1"},
46+
Includes: []string{name},
4947
},
5048
Count: 3,
5149
}
52-
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, errors.New("not implemented")).Times(3)
50+
51+
// failed by call plugin
52+
rmgr := c.rmgr.(*resourcemocks.Manager)
53+
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, types.ErrNoETCD).Once()
5354
_, err := c.CalculateCapacity(ctx, opts)
5455
assert.Error(t, err)
5556

5657
// failed by get deploy status
58+
nrim := map[string]*resources.NodeCapacityInfo{
59+
name: {
60+
NodeName: name,
61+
Capacity: 10,
62+
Usage: 0.5,
63+
Rate: 0.5,
64+
Weight: 100,
65+
},
66+
}
67+
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(
68+
nrim, 100, nil).Times(3)
5769
store.On("GetDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
5870
_, err = c.CalculateCapacity(ctx, opts)
5971
assert.Error(t, err)
60-
store.On("GetDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(map[string]int{"n1": 0}, nil)
6172

62-
// failed by get deploy plan
63-
opts.DeployStrategy = "FAKE"
73+
// failed by get deploy strategy
74+
store.On("GetDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(map[string]int{name: 0}, nil)
75+
opts.Count = -1
6476
_, err = c.CalculateCapacity(ctx, opts)
6577
assert.Error(t, err)
6678

79+
// success
80+
opts.Count = 1
81+
_, err = c.CalculateCapacity(ctx, opts)
82+
assert.NoError(t, err)
83+
6784
// strategy: dummy
6885
opts.DeployStrategy = strategy.Dummy
86+
87+
// failed by GetNodesDeployCapacity
88+
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, types.ErrNoETCD).Once()
89+
_, err = c.CalculateCapacity(ctx, opts)
90+
assert.Error(t, err)
91+
92+
// failed by total <= 0
93+
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(nil, -1, nil).Once()
94+
_, err = c.CalculateCapacity(ctx, opts)
95+
assert.Error(t, err)
96+
97+
// success
6998
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(
70-
map[string]*resources.NodeCapacityInfo{
71-
"n1": {
72-
NodeName: "n1",
73-
Capacity: 10,
74-
Usage: 0.5,
75-
Rate: 0.5,
76-
Weight: 100,
77-
},
78-
},
79-
10, nil,
80-
)
99+
nrim, 10, nil)
81100
msg, err := c.CalculateCapacity(ctx, opts)
82101
assert.NoError(t, err)
83-
assert.Equal(t, msg.NodeCapacities["n1"], 10)
102+
assert.Equal(t, msg.NodeCapacities[name], 10)
84103
assert.Equal(t, msg.Total, 10)
85104

86105
rmgr.AssertExpectations(t)

cluster/calcium/control.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
2222
defer close(ch)
2323
wg := &sync.WaitGroup{}
2424
wg.Add(len(ids))
25+
defer wg.Wait()
2526
for _, id := range ids {
2627
id := id
2728
_ = c.pool.Invoke(func() {
@@ -59,7 +60,6 @@ func (c *Calcium) ControlWorkload(ctx context.Context, ids []string, t string, f
5960
}
6061
})
6162
}
62-
wg.Wait()
6363
})
6464

6565
return ch, nil

cluster/calcium/control_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestControlStart(t *testing.T) {
2121
ctx := context.Background()
2222
store := c.store.(*storemocks.Store)
2323
lock := &lockmocks.DistributedLock{}
24-
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
24+
lock.On("Lock", mock.Anything).Return(ctx, nil)
2525
lock.On("Unlock", mock.Anything).Return(nil)
2626
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
2727
// failed by GetWorkloads
@@ -102,7 +102,7 @@ func TestControlStop(t *testing.T) {
102102
ctx := context.Background()
103103
store := c.store.(*storemocks.Store)
104104
lock := &lockmocks.DistributedLock{}
105-
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
105+
lock.On("Lock", mock.Anything).Return(ctx, nil)
106106
lock.On("Unlock", mock.Anything).Return(nil)
107107
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
108108
workload := &types.Workload{
@@ -146,7 +146,7 @@ func TestControlRestart(t *testing.T) {
146146
ctx := context.Background()
147147
store := c.store.(*storemocks.Store)
148148
lock := &lockmocks.DistributedLock{}
149-
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
149+
lock.On("Lock", mock.Anything).Return(ctx, nil)
150150
lock.On("Unlock", mock.Anything).Return(nil)
151151
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
152152
engine := &enginemocks.API{}

cluster/calcium/copy.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
2121
defer close(ch)
2222

2323
wg := sync.WaitGroup{}
24+
wg.Add(len(opts.Targets))
25+
defer wg.Wait()
2426
log.Infof(ctx, "[Copy] Copy %d workloads files", len(opts.Targets))
2527

2628
// workload one by one
2729
for id, paths := range opts.Targets {
28-
wg.Add(1)
29-
3030
utils.SentryGo(func(id string, paths []string) func() {
3131
return func() {
3232
defer wg.Done()
@@ -61,7 +61,6 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
6161
}
6262
}(id, paths))
6363
}
64-
wg.Wait()
6564
})
6665
return ch, nil
6766
}

cluster/calcium/copy_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func TestCopy(t *testing.T) {
1717
c := NewTestCluster()
1818
ctx := context.Background()
1919

20+
// failed by target
2021
_, err := c.Copy(ctx, &types.CopyOptions{
2122
Targets: map[string][]string{},
2223
})
@@ -32,7 +33,7 @@ func TestCopy(t *testing.T) {
3233
}
3334
store := c.store.(*storemocks.Store)
3435
lock := &lockmocks.DistributedLock{}
35-
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
36+
lock.On("Lock", mock.Anything).Return(ctx, nil)
3637
lock.On("Unlock", mock.Anything).Return(nil)
3738
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
3839
// failed by GetWorkload
@@ -47,7 +48,7 @@ func TestCopy(t *testing.T) {
4748
workload.Engine = engine
4849
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
4950
// failed by VirtualizationCopyFrom
50-
engine.On("VirtualizationCopyFrom", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, 0, int64(0), types.ErrNilEngine).Twice()
51+
engine.On("VirtualizationCopyFrom", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, 0, int64(0), types.ErrNoETCD).Twice()
5152
ch, err = c.Copy(ctx, opts)
5253
assert.NoError(t, err)
5354
for r := range ch {

0 commit comments

Comments
 (0)