Skip to content

Commit 65e7a4d

Browse files
committed
add storage plugin
1 parent 7e44783 commit 65e7a4d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+2164
-8326
lines changed

cluster/calcium/calcium.go

+7-19
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import (
1010
"github.com/projecteru2/core/discovery"
1111
"github.com/projecteru2/core/discovery/helium"
1212
"github.com/projecteru2/core/log"
13+
"github.com/projecteru2/core/resource3"
14+
"github.com/projecteru2/core/resource3/cobalt"
1315
"github.com/projecteru2/core/resources"
14-
"github.com/projecteru2/core/resources/cpumem"
15-
"github.com/projecteru2/core/resources/volume"
1616
"github.com/projecteru2/core/source"
1717
"github.com/projecteru2/core/source/github"
1818
"github.com/projecteru2/core/source/gitlab"
@@ -27,6 +27,7 @@ type Calcium struct {
2727
config types.Config
2828
store store.Store
2929
rmgr resources.Manager
30+
rmgr2 resource3.Manager
3031
source source.Source
3132
watcher discovery.Service
3233
wal wal.WAL
@@ -64,26 +65,13 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
6465
watcher := helium.New(ctx, config.GRPCConfig, store)
6566

6667
// set resource plugin manager
67-
rmgr, err := resources.NewPluginsManager(config)
68+
rmgr2, err := cobalt.New(config)
6869
if err != nil {
6970
logger.Error(ctx, err)
7071
return nil, err
7172
}
72-
73-
// load internal plugins
74-
cpumem, err := cpumem.NewPlugin(config)
75-
if err != nil {
76-
logger.Error(ctx, err, "new cpumem plugin error")
77-
return nil, err
78-
}
79-
volume, err := volume.NewPlugin(config)
80-
if err != nil {
81-
logger.Error(ctx, err, "new volume plugin error")
82-
return nil, err
83-
}
84-
rmgr.AddPlugins(cpumem, volume)
85-
// load binary plugins
86-
if err = rmgr.LoadPlugins(ctx); err != nil {
73+
if err := rmgr2.LoadPlugins(ctx); err != nil {
74+
logger.Error(ctx, err)
8775
return nil, err
8876
}
8977

@@ -93,7 +81,7 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
9381
return nil, err
9482
}
9583

96-
cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, rmgr: rmgr, pool: pool}
84+
cal := &Calcium{store: store, config: config, source: scm, watcher: watcher, rmgr2: rmgr2, pool: pool}
9785

9886
cal.wal, err = enableWAL(config, cal, store)
9987
if err != nil {

cluster/calcium/node.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
2020
logger.Error(ctx, err)
2121
return nil, err
2222
}
23-
var resourceCapacity map[string]types.NodeResourceArgs
24-
var resourceUsage map[string]types.NodeResourceArgs
23+
var resourceCapacity map[string]*types.RawNodeResource
2524
var node *types.Node
2625
var err error
2726

@@ -40,7 +39,7 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
4039
ctx,
4140
// if: add node resource with resource plugins
4241
func(ctx context.Context) error {
43-
resourceCapacity, resourceUsage, err = c.rmgr.AddNode(ctx, opts.Nodename, opts.ResourceOpts, nodeInfo)
42+
resourceCapacity, err = c.rmgr2.AddNode(ctx, opts.Nodename, opts.ResourceOptions, nodeInfo)
4443
return err
4544
},
4645
// then: add node meta in store
@@ -50,7 +49,6 @@ func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
5049
return err
5150
}
5251
node.Resource.Capacity = resourceCapacity
53-
node.Resource.Usage = resourceUsage
5452
_ = c.pool.Invoke(func() { c.doSendNodeMetrics(ctx, node) })
5553
return nil
5654
},
@@ -214,7 +212,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
214212
n.Labels = opts.Labels
215213
}
216214

217-
var originNodeResourceCapacity map[string]types.NodeResourceArgs
215+
var originNodeResourceCapacity map[string]*types.NodeResourceSettings
218216
return utils.Txn(ctx,
219217
// if: update node resource capacity success
220218
func(ctx context.Context) error {

cluster/cluster.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Cluster interface {
5050
GetPod(ctx context.Context, podname string) (*types.Pod, error)
5151
ListPods(ctx context.Context) ([]*types.Pod, error)
5252
// pod resource
53-
PodResource(ctx context.Context, podname string) (chan *types.NodeResource, error)
53+
PodResource(ctx context.Context, podname string) (chan *types.NodeResourceInfo, error)
5454
// meta node
5555
AddNode(context.Context, *types.AddNodeOptions) (*types.Node, error)
5656
RemoveNode(ctx context.Context, nodename string) error
@@ -62,7 +62,7 @@ type Cluster interface {
6262
GetNodeStatus(ctx context.Context, nodename string) (*types.NodeStatus, error)
6363
NodeStatusStream(ctx context.Context) chan *types.NodeStatus
6464
// node resource
65-
NodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error)
65+
NodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResourceInfo, error)
6666
// calculate capacity
6767
CalculateCapacity(context.Context, *types.DeployOptions) (*types.CapacityMessage, error)
6868
// meta workloads

metrics/handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler)
2121
logger.Error(ctx, err, "Get all nodes err")
2222
}
2323
for node := range nodes {
24-
metrics, err := m.rmgr.GetNodeMetrics(ctx, node)
24+
metrics, err := m.rmgr2.GetNodeMetrics(ctx, node)
2525
if err != nil {
2626
logger.Error(ctx, err, "Get metrics failed")
2727
continue

metrics/metrics.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
"sync"
88

99
"github.com/projecteru2/core/log"
10+
"github.com/projecteru2/core/resource3"
11+
"github.com/projecteru2/core/resource3/cobalt"
12+
plugintypes "github.com/projecteru2/core/resource3/plugins/types"
1013
"github.com/projecteru2/core/resources"
1114
"github.com/projecteru2/core/types"
1215
"github.com/projecteru2/core/utils"
@@ -33,13 +36,13 @@ type Metrics struct {
3336

3437
Collectors map[string]prometheus.Collector
3538

36-
rmgr resources.Manager
39+
rmgr2 resource3.Manager
3740
}
3841

3942
// SendDeployCount update deploy counter
4043
func (m *Metrics) SendDeployCount(ctx context.Context, n int) {
4144
log.WithFunc("metrics.SendDeployCount").Info(ctx, "Update deploy counter")
42-
metrics := &resources.Metrics{
45+
metrics := &plugintypes.Metrics{
4346
Name: deployCountName,
4447
Labels: []string{m.Hostname},
4548
Key: deployCountKey,
@@ -50,7 +53,7 @@ func (m *Metrics) SendDeployCount(ctx context.Context, n int) {
5053
}
5154

5255
// SendMetrics update metrics
53-
func (m *Metrics) SendMetrics(ctx context.Context, metrics ...*resources.Metrics) {
56+
func (m *Metrics) SendMetrics(ctx context.Context, metrics ...*plugintypes.Metrics) {
5457
logger := log.WithFunc("metrics.SendMetrics")
5558
for _, metric := range metrics {
5659
collector, ok := m.Collectors[metric.Name]
@@ -133,7 +136,7 @@ func InitMetrics(config types.Config, metricsDescriptions []*resources.MetricsDe
133136
if err != nil {
134137
return err
135138
}
136-
rmgr, err := resources.NewPluginsManager(config)
139+
rmgr2, err := cobalt.New(config)
137140
if err != nil {
138141
return err
139142
}
@@ -142,7 +145,7 @@ func InitMetrics(config types.Config, metricsDescriptions []*resources.MetricsDe
142145
StatsdAddr: config.Statsd,
143146
Hostname: utils.CleanStatsdMetrics(hostname),
144147
Collectors: map[string]prometheus.Collector{},
145-
rmgr: rmgr,
148+
rmgr2: rmgr2,
146149
}
147150

148151
for _, desc := range metricsDescriptions {

resource3/cobalt/alloc.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package cobalt
2+
3+
import (
4+
"context"
5+
6+
"github.com/projecteru2/core/log"
7+
"github.com/projecteru2/core/resource3/plugins"
8+
plugintypes "github.com/projecteru2/core/resource3/plugins/types"
9+
"github.com/projecteru2/core/types"
10+
"github.com/projecteru2/core/utils"
11+
)
12+
13+
/*
14+
Alloc
15+
opts struct
16+
17+
{
18+
"plugin1":{
19+
"cpu-request": 1.2,
20+
"cpu-limit": 2.0,
21+
},
22+
"plugin2":{
23+
},
24+
}
25+
*/
26+
func (m Manager) Alloc(ctx context.Context, nodename string, deployCount int, opts types.Resources) ([]types.Resources, []types.Resources, error) {
27+
logger := log.WithFunc("resource.coblat.Alloc")
28+
29+
// index -> no, map by plugin name
30+
workloadsParams := []types.Resources{}
31+
engineParams := []types.Resources{}
32+
33+
// init engine args
34+
for i := 0; i < deployCount; i++ {
35+
workloadsParams[i] = types.Resources{}
36+
engineParams[i] = types.Resources{}
37+
}
38+
39+
return workloadsParams, engineParams, utils.PCR(ctx,
40+
// prepare: calculate engine args and resource args
41+
func(ctx context.Context) error {
42+
resps, err := call(ctx, m.plugins, func(plugin plugins.Plugin) (*plugintypes.CalculateDeployResponse, error) {
43+
resp, err := plugin.CalculateDeploy(ctx, nodename, deployCount, opts[plugin.Name()])
44+
if err != nil {
45+
logger.Errorf(ctx, err, "plugin %+v failed to compute alloc args, request %+v, node %+v, deploy count %+v", plugin.Name(), opts, nodename, deployCount)
46+
}
47+
return resp, err
48+
})
49+
if err != nil {
50+
return err
51+
}
52+
53+
// calculate engine args
54+
for plugin, resp := range resps {
55+
logger.Debug(ctx, plugin.Name())
56+
for index, params := range resp.WorkloadsResource {
57+
workloadsParams[index][plugin.Name()] = params
58+
}
59+
for index, params := range resp.EnginesParams {
60+
v := engineParams[index][plugin.Name()]
61+
vMerged, err := m.mergeEngineParams(ctx, v, params)
62+
if err != nil {
63+
logger.Error(ctx, err, "invalid engine args")
64+
return err
65+
}
66+
engineParams[index][plugin.Name()] = vMerged
67+
}
68+
}
69+
return nil
70+
},
71+
// commit: update node resources
72+
func(ctx context.Context) error {
73+
// TODO why incr?
74+
if _, _, err := m.SetNodeResourceUsage(ctx, nodename, nil, nil, workloadsParams, true, plugins.Incr); err != nil {
75+
logger.Error(ctx, err, "failed to update node resource")
76+
return err
77+
}
78+
return nil
79+
},
80+
// rollback: do nothing
81+
func(ctx context.Context) error {
82+
return nil
83+
},
84+
m.config.GlobalTimeout,
85+
)
86+
}
87+
88+
// RollbackAlloc rollbacks the allocated resource
89+
func (m Manager) RollbackAlloc(ctx context.Context, nodename string, workloadsParams []types.Resources) error {
90+
_, _, err := m.SetNodeResourceUsage(ctx, nodename, nil, nil, workloadsParams, true, plugins.Decr)
91+
return err
92+
}

resource3/cobalt/call.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package cobalt
2+
3+
import (
4+
"context"
5+
6+
"github.com/cockroachdb/errors"
7+
"github.com/projecteru2/core/log"
8+
"github.com/projecteru2/core/resource3/plugins"
9+
)
10+
11+
func call[T any](ctx context.Context, ps []plugins.Plugin, f func(plugins.Plugin) (T, error)) (map[plugins.Plugin]T, error) {
12+
// TODO 并行化,意义不大
13+
var combinedErr error
14+
results := map[plugins.Plugin]T{}
15+
16+
for _, p := range ps {
17+
result, err := f(p)
18+
if err != nil {
19+
log.WithFunc("resource.cobalt.call").Errorf(ctx, err, "failed to call plugin %+v", p.Name())
20+
combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, p.Name()))
21+
continue
22+
}
23+
results[p] = result
24+
}
25+
26+
return results, combinedErr
27+
}

resource3/cobalt/cobalt.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package cobalt
2+
3+
import (
4+
"context"
5+
6+
"github.com/projecteru2/core/log"
7+
"github.com/projecteru2/core/resource3/plugins"
8+
"github.com/projecteru2/core/resource3/plugins/binary"
9+
"github.com/projecteru2/core/resource3/plugins/cpumem"
10+
"github.com/projecteru2/core/types"
11+
"github.com/projecteru2/core/utils"
12+
)
13+
14+
// Manager manager plugins
15+
type Manager struct {
16+
config types.Config
17+
plugins []plugins.Plugin
18+
}
19+
20+
// New creates a plugin manager
21+
func New(config types.Config) (*Manager, error) {
22+
m := &Manager{
23+
config: config,
24+
plugins: []plugins.Plugin{},
25+
}
26+
27+
return m, nil
28+
}
29+
30+
// LoadPlugins .
31+
func (m *Manager) LoadPlugins(ctx context.Context) error {
32+
logger := log.WithFunc("resource.cobalt.LoadPlugins")
33+
// Load internal
34+
cm, err := cpumem.NewPlugin(ctx, m.config)
35+
if err != nil {
36+
return err
37+
}
38+
m.AddPlugins(cm)
39+
40+
// st, err := storage.NewPlugin(ctx, m.config)
41+
// if err != nil {
42+
// return err
43+
// }
44+
// m.AddPlugins(st)
45+
46+
if m.config.ResourcePlugin.Dir == "" {
47+
return nil
48+
}
49+
50+
pluginFiles, err := utils.ListAllExecutableFiles(m.config.ResourcePlugin.Dir)
51+
if err != nil {
52+
logger.Errorf(ctx, err, "failed to list all executable files dir: %+v", m.config.ResourcePlugin.Dir)
53+
return err
54+
}
55+
56+
cache := map[string]struct{}{}
57+
for _, plugin := range m.plugins {
58+
cache[plugin.Name()] = struct{}{}
59+
}
60+
61+
for _, file := range pluginFiles {
62+
logger.Infof(ctx, "load binary plugin: %+v", file)
63+
b, err := binary.NewPlugin(ctx, file, m.config)
64+
if err != nil {
65+
return err
66+
}
67+
if _, ok := cache[b.Name()]; ok {
68+
continue
69+
}
70+
m.AddPlugins(b)
71+
}
72+
73+
return nil
74+
}
75+
76+
// AddPlugins adds a plugin (for test and debug)
77+
func (m *Manager) AddPlugins(plugins ...plugins.Plugin) {
78+
m.plugins = append(m.plugins, plugins...)
79+
}
80+
81+
// GetPlugins is used for mock
82+
func (m *Manager) GetPlugins() []plugins.Plugin {
83+
return m.plugins
84+
}

0 commit comments

Comments
 (0)