Skip to content

Commit ac037e7

Browse files
authored
Resource Plugin (#491)
* resource plugin * modify rpc layer to adapt to the resource plugin * temporarily remove metrics * modify the engine layer to adapt to the resource plugin * modify cluster layer to adapt to the resource plugin * unit tests * metrics * compatible * lint & disable nolintlint temporarily * migerate tools & unit tests * review
1 parent 674a8b2 commit ac037e7

File tree

164 files changed

+11850
-10146
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+11850
-10146
lines changed

Makefile

+7-8
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ deps:
2222
env GO111MODULE=on go mod vendor
2323

2424
binary:
25-
CGO_ENABLED=0 go build -ldflags "$(GO_LDFLAGS)" -o eru-core
25+
CGO_ENABLED=0 go build -ldflags "$(GO_LDFLAGS)" -gcflags=all=-G=3 -o eru-core
2626

2727
build: deps binary
2828

@@ -31,7 +31,6 @@ test: deps unit-test
3131
mock: deps
3232
mockery --dir vendor/google.golang.org/grpc --output 3rdmocks --name ServerStream
3333
mockery --dir vendor/github.com/docker/docker/client --output engine/docker/mocks --name APIClient
34-
mockery --dir scheduler --output scheduler/mocks --name Scheduler
3534
mockery --dir source --output source/mocks --name Source
3635
mockery --dir store --output store/mocks --name Store
3736
mockery --dir engine --output engine/mocks --name API
@@ -41,6 +40,7 @@ mock: deps
4140
mockery --dir store/etcdv3/meta --output store/etcdv3/meta/mocks --all
4241
mockery --dir vendor/go.etcd.io/etcd/client/v3 --output store/etcdv3/meta/mocks --name Txn
4342
mockery --dir rpc/gen/ --output rpc/mocks --name CoreRPC_RunAndWaitServer
43+
mockery --dir resources --output resources/mocks --name Plugin
4444

4545
.ONESHELL:
4646

@@ -49,22 +49,21 @@ cloc:
4949

5050
unit-test:
5151
go vet `go list ./... | grep -v '/vendor/' | grep -v '/tools'` && \
52-
go test -race -timeout 240s -count=1 -cover ./utils/... \
52+
go test -race -timeout 240s -count=1 -vet=off -cover ./utils/... \
5353
./types/... \
5454
./store/etcdv3/. \
5555
./store/etcdv3/embedded/. \
5656
./store/etcdv3/meta/. \
5757
./source/common/... \
5858
./strategy/... \
59-
./scheduler/complex/... \
6059
./rpc/. \
6160
./lock/etcdlock/... \
6261
./auth/simple/... \
6362
./discovery/helium... \
64-
./resources/types/. \
65-
./resources/storage/... \
66-
./resources/volume/... \
67-
./resources/cpumem/... \
63+
./resources/cpumem/models/... \
64+
./resources/cpumem/schedule/... \
65+
./resources/volume/models/... \
66+
./resources/volume/schedule/... \
6867
./wal/. \
6968
./wal/kv/. \
7069
./store/redis/... \

cluster/calcium/build.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,22 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
7575
return nil, errors.WithStack(types.ErrInsufficientNodes)
7676
}
7777
// get idle max node
78-
node, err := c.scheduler.MaxIdleNode(nodes)
79-
return node, err
78+
return c.getMostIdleNode(ctx, nodes)
79+
}
80+
81+
func (c *Calcium) getMostIdleNode(ctx context.Context, nodes []*types.Node) (*types.Node, error) {
82+
nodeNames := []string{}
83+
nodeMap := map[string]*types.Node{}
84+
for _, node := range nodes {
85+
nodeNames = append(nodeNames, node.Name)
86+
nodeMap[node.Name] = node
87+
}
88+
89+
mostIdleNode, err := c.resource.GetMostIdleNode(ctx, nodeNames)
90+
if err != nil {
91+
return nil, err
92+
}
93+
return nodeMap[mostIdleNode], nil
8094
}
8195

8296
func (c *Calcium) buildFromSCM(ctx context.Context, node *types.Node, opts *types.BuildOptions) ([]string, io.ReadCloser, error) {

cluster/calcium/build_test.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import (
88
"testing"
99

1010
enginemocks "github.com/projecteru2/core/engine/mocks"
11-
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
11+
"github.com/projecteru2/core/resources"
12+
resourcemocks "github.com/projecteru2/core/resources/mocks"
1213
storemocks "github.com/projecteru2/core/store/mocks"
1314
"github.com/projecteru2/core/types"
1415

@@ -82,13 +83,18 @@ func TestBuild(t *testing.T) {
8283
Engine: engine,
8384
}
8485
store.On("GetNodesByPod", mock.AnythingOfType("*context.emptyCtx"), mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{node}, nil)
85-
scheduler := &schedulermocks.Scheduler{}
86-
c.scheduler = scheduler
87-
// failed by MaxIdleNode
88-
scheduler.On("MaxIdleNode", mock.AnythingOfType("[]*types.Node")).Return(nil, types.ErrBadMeta).Once()
86+
87+
plugin := c.resource.GetPlugins()[0].(*resourcemocks.Plugin)
88+
89+
// failed by plugin error
90+
plugin.On("GetMostIdleNode", mock.Anything, mock.Anything).Return(nil, types.ErrGetMostIdleNodeFailed).Once()
8991
ch, err = c.BuildImage(ctx, opts)
9092
assert.Error(t, err)
91-
scheduler.On("MaxIdleNode", mock.AnythingOfType("[]*types.Node")).Return(node, nil)
93+
94+
plugin.On("GetMostIdleNode", mock.Anything, mock.Anything).Return(&resources.GetMostIdleNodeResponse{NodeName: node.Name, Priority: 100}, nil)
95+
plugin.On("GetNodeResourceInfo", mock.Anything, mock.Anything, mock.Anything).Return(&resources.GetNodeResourceInfoResponse{
96+
ResourceInfo: &resources.NodeResourceInfo{},
97+
}, nil)
9298
// create image
9399
c.config.Docker.Hub = "test.com"
94100
c.config.Docker.Namespace = "test"

cluster/calcium/calcium.go

+30-11
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ import (
99
"github.com/projecteru2/core/discovery"
1010
"github.com/projecteru2/core/discovery/helium"
1111
"github.com/projecteru2/core/log"
12-
"github.com/projecteru2/core/scheduler"
13-
complexscheduler "github.com/projecteru2/core/scheduler/complex"
12+
"github.com/projecteru2/core/resources"
13+
"github.com/projecteru2/core/resources/cpumem"
14+
"github.com/projecteru2/core/resources/volume"
1415
"github.com/projecteru2/core/source"
1516
"github.com/projecteru2/core/source/github"
1617
"github.com/projecteru2/core/source/gitlab"
@@ -24,7 +25,7 @@ import (
2425
type Calcium struct {
2526
config types.Config
2627
store store.Store
27-
scheduler scheduler.Scheduler
28+
resource *resources.PluginManager
2829
source source.Source
2930
watcher discovery.Service
3031
wal *WAL
@@ -41,13 +42,6 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
4142
return nil, logger.Err(context.TODO(), errors.WithStack(err))
4243
}
4344

44-
// set scheduler
45-
potassium, err := complexscheduler.New(config)
46-
if err != nil {
47-
return nil, logger.Err(context.TODO(), errors.WithStack(err))
48-
}
49-
scheduler.InitSchedulerV1(potassium)
50-
5145
// set scm
5246
var scm source.Source
5347
scmtype := strings.ToLower(config.Git.SCMType)
@@ -67,10 +61,35 @@ func New(config types.Config, t *testing.T) (*Calcium, error) {
6761
// set watcher
6862
watcher := helium.New(config.GRPCConfig, store)
6963

70-
cal := &Calcium{store: store, config: config, scheduler: potassium, source: scm, watcher: watcher}
64+
// set resource plugin manager
65+
resource, err := resources.NewPluginManager(config)
66+
if err != nil {
67+
return nil, logger.Err(context.TODO(), errors.WithStack(err))
68+
}
69+
70+
// load cpumem plugin
71+
cpumem, err := cpumem.NewPlugin(config)
72+
if err != nil {
73+
log.Errorf(context.TODO(), "[NewPluginManager] new cpumem plugin error: %v", err)
74+
return nil, err
75+
}
76+
resource.AddPlugins(cpumem)
77+
78+
// load volume plugin
79+
volume, err := volume.NewPlugin(config)
80+
if err != nil {
81+
log.Errorf(context.TODO(), "[NewPluginManager] new volume plugin error: %v", err)
82+
return nil, err
83+
}
84+
resource.AddPlugins(volume)
85+
86+
cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, resource: resource}
87+
7188
cal.wal, err = newWAL(config, cal)
7289
cal.identifier = config.Identifier()
7390

91+
go cal.InitMetrics()
92+
7493
return cal, logger.Err(nil, errors.WithStack(err)) //nolint
7594
}
7695

cluster/calcium/calcium_test.go

+13-23
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,21 @@ import (
55
"io/ioutil"
66
"os"
77
"path/filepath"
8-
"sync"
98
"testing"
109
"time"
1110

12-
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
13+
14+
"github.com/projecteru2/core/resources"
15+
resourcemocks "github.com/projecteru2/core/resources/mocks"
1316
sourcemocks "github.com/projecteru2/core/source/mocks"
1417
storemocks "github.com/projecteru2/core/store/mocks"
1518
"github.com/projecteru2/core/types"
1619
"github.com/projecteru2/core/wal"
1720
walmocks "github.com/projecteru2/core/wal/mocks"
18-
19-
"github.com/stretchr/testify/assert"
20-
"github.com/stretchr/testify/mock"
2121
)
2222

23-
// DummyLock replace lock for testing
24-
type dummyLock struct {
25-
m sync.Mutex
26-
}
27-
28-
// Lock for lock
29-
func (d *dummyLock) Lock(ctx context.Context) (context.Context, error) {
30-
d.m.Lock()
31-
return context.Background(), nil
32-
}
33-
34-
// Unlock for unlock
35-
func (d *dummyLock) Unlock(ctx context.Context) error {
36-
d.m.Unlock()
37-
return nil
38-
}
39-
4023
func NewTestCluster() *Calcium {
4124
walDir, err := ioutil.TempDir(os.TempDir(), "core.wal.*")
4225
if err != nil {
@@ -61,14 +44,21 @@ func NewTestCluster() *Calcium {
6144
HAKeepaliveInterval: 16 * time.Second,
6245
}
6346
c.store = &storemocks.Store{}
64-
c.scheduler = &schedulermocks.Scheduler{}
6547
c.source = &sourcemocks.Source{}
6648
c.wal = &WAL{WAL: &walmocks.WAL{}}
6749

6850
mwal := c.wal.WAL.(*walmocks.WAL)
6951
commit := wal.Commit(func() error { return nil })
7052
mwal.On("Log", mock.Anything, mock.Anything).Return(commit, nil)
7153

54+
plugin := &resourcemocks.Plugin{}
55+
plugin.On("Name").Return("mock-plugin")
56+
plugin.On("ResolveNodeResourceInfoToMetrics", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded)
57+
if c.resource, err = resources.NewPluginManager(c.config); err != nil {
58+
panic(err)
59+
}
60+
c.resource.AddPlugins(plugin)
61+
7262
return c
7363
}
7464

cluster/calcium/capacity.go

+15-40
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/projecteru2/core/log"
77
"github.com/projecteru2/core/resources"
8-
resourcetypes "github.com/projecteru2/core/resources/types"
98
"github.com/projecteru2/core/strategy"
109
"github.com/projecteru2/core/types"
1110

@@ -22,58 +21,34 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
2221
}
2322

2423
return msg, c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
24+
nodes := []string{}
25+
for node := range nodeMap {
26+
nodes = append(nodes, node)
27+
}
28+
2529
if opts.DeployStrategy != strategy.Dummy {
26-
if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
27-
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doAllocResource failed: %+v", err)
30+
if msg.NodeCapacities, err = c.doGetDeployMap(ctx, nodes, opts); err != nil {
31+
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doGetDeployMap failed: %+v", err)
2832
return err
2933
}
3034

3135
for _, capacity := range msg.NodeCapacities {
3236
msg.Total += capacity
3337
}
3438
} else {
35-
var infos []strategy.Info
36-
msg.Total, _, infos, err = c.doCalculateCapacity(ctx, nodeMap, opts)
39+
var infos map[string]*resources.NodeCapacityInfo
40+
infos, msg.Total, err = c.resource.GetNodesDeployCapacity(ctx, nodes, opts.ResourceOpts)
3741
if err != nil {
38-
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doCalculateCapacity failed: %+v", err)
42+
logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
3943
return err
4044
}
41-
for _, info := range infos {
42-
msg.NodeCapacities[info.Nodename] = info.Capacity
45+
if msg.Total <= 0 {
46+
return errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
47+
}
48+
for node, info := range infos {
49+
msg.NodeCapacities[node] = info.Capacity
4350
}
4451
}
4552
return nil
4653
})
4754
}
48-
49-
func (c *Calcium) doCalculateCapacity(ctx context.Context, nodeMap map[string]*types.Node, opts *types.DeployOptions) (
50-
total int,
51-
plans []resourcetypes.ResourcePlans,
52-
infos []strategy.Info,
53-
err error,
54-
) {
55-
if len(nodeMap) == 0 {
56-
return 0, nil, nil, errors.WithStack(types.ErrInsufficientNodes)
57-
}
58-
59-
resourceRequests, err := resources.MakeRequests(opts.ResourceOpts)
60-
if err != nil {
61-
return 0, nil, nil, err
62-
}
63-
64-
// select available nodes
65-
if plans, err = resources.SelectNodesByResourceRequests(ctx, resourceRequests, nodeMap); err != nil {
66-
return 0, nil, nil, err
67-
}
68-
69-
// deploy strategy
70-
infos = strategy.NewInfos(resourceRequests, nodeMap, plans)
71-
for _, info := range infos {
72-
total += info.Capacity
73-
}
74-
log.Debugf(ctx, "[Calcium.doCalculateCapacity] plans: %+v, total: %v", plans, total)
75-
if total <= 0 {
76-
return 0, nil, nil, errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
77-
}
78-
return
79-
}

0 commit comments

Comments
 (0)