Skip to content

Commit cde52c9

Browse files
committed
add a deploy plan based on node resource usage
1 parent 8153d74 commit cde52c9

15 files changed

+206
-23
lines changed

cluster/calcium/helper.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -344,13 +344,15 @@ func filterNode(node *types.Node, labels map[string]string) bool {
344344
return true
345345
}
346346

347-
func getNodesInfo(nodes map[string]*types.Node) []types.NodeInfo {
347+
func getNodesInfo(nodes map[string]*types.Node, cpu float64, memory int64) []types.NodeInfo {
348348
result := []types.NodeInfo{}
349349
for _, node := range nodes {
350350
nodeInfo := types.NodeInfo{
351351
Name: node.Name,
352352
CPUMap: node.CPU,
353353
MemCap: node.MemCap,
354+
CPURate: cpu / float64(len(node.InitCPU)),
355+
MemRate: float64(memory) / float64(node.InitMemCap),
354356
CPUUsage: node.CPUUsage / float64(len(node.InitCPU)),
355357
MemUsage: 1.0 - float64(node.MemCap)/float64(node.InitMemCap),
356358
Capacity: 0,

cluster/calcium/lambda.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, std
2424
opts.Entrypoint.Log = &types.LogConfig{Type: "json-file"}
2525

2626
// count = 1 && OpenStdin
27-
if opts.OpenStdin && (opts.Count != 1 || opts.DeployMethod != cluster.DeployAuto) {
27+
if opts.OpenStdin && (opts.Count != 1 || opts.DeployMethod != cluster.DeployGlobal) {
2828
close(ch)
2929
log.Errorf("Count %d method %s", opts.Count, opts.DeployMethod)
3030
return ch, types.ErrRunAndWaitCountOneWithStdin

cluster/calcium/realloc.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64
7373
// deal with normal container
7474
for pod, nodeContainersInfo := range containersInfo {
7575
switch pod.Favor {
76-
case scheduler.MEMORY_PRIOR:
76+
case scheduler.MemoryPrior:
7777
go func(nodeContainersInfo nodeContainers) {
7878
defer wg.Done()
7979
c.doReallocContainerWithMemoryPrior(ctx, ch, nodeContainersInfo, cpu, mem)
8080
}(nodeContainersInfo)
81-
case scheduler.CPU_PRIOR:
81+
case scheduler.CPUPrior:
8282
go func(nodeContainersInfo nodeContainers) {
8383
defer wg.Done()
8484
c.doReallocContainersWithCPUPrior(ctx, ch, nodeContainersInfo, cpu, mem)

cluster/calcium/realloc_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestReallocMem(t *testing.T) {
8080
}
8181
store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(node1, nil)
8282
// node cap not enough
83-
pod1.Favor = scheduler.MEMORY_PRIOR
83+
pod1.Favor = scheduler.MemoryPrior
8484
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0, 2*types.GByte)
8585
assert.NoError(t, err)
8686
for c := range ch {
@@ -163,7 +163,7 @@ func TestReallocCPU(t *testing.T) {
163163
store.On("GetContainer", mock.Anything, mock.Anything).Return(c1, nil)
164164
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
165165
store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(node1, nil)
166-
pod1.Favor = scheduler.CPU_PRIOR
166+
pod1.Favor = scheduler.CPUPrior
167167
// wrong cpu
168168
ch, err := c.ReallocResource(ctx, []string{"c1"}, -1, 2*types.GByte)
169169
assert.NoError(t, err)

cluster/calcium/resource.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
7777
return nil, err
7878
}
7979
defer c.doUnlockAll(nodeLocks)
80-
nodesInfo = getNodesInfo(nodes)
80+
nodesInfo = getNodesInfo(nodes, opts.CPUQuota, opts.Memory)
8181

8282
// 载入之前部署的情况
8383
nodesInfo, err = c.store.MakeDeployStatus(ctx, opts, nodesInfo)
@@ -86,9 +86,9 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
8686
}
8787

8888
switch podType {
89-
case scheduler.MEMORY_PRIOR:
89+
case scheduler.MemoryPrior:
9090
nodesInfo, total, err = c.scheduler.SelectMemoryNodes(nodesInfo, opts.CPUQuota, opts.Memory) // 还是以 Bytes 作单位, 不转换了
91-
case scheduler.CPU_PRIOR:
91+
case scheduler.CPUPrior:
9292
nodesInfo, nodeCPUPlans, total, err = c.scheduler.SelectCPUNodes(nodesInfo, opts.CPUQuota, opts.Memory)
9393
default:
9494
return nil, types.ErrBadPodType
@@ -104,6 +104,8 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
104104
nodesInfo, err = c.scheduler.EachDivision(nodesInfo, opts.Count, opts.NodesLimit)
105105
case cluster.DeployFill:
106106
nodesInfo, err = c.scheduler.FillDivision(nodesInfo, opts.Count, opts.NodesLimit)
107+
case cluster.DeployGlobal:
108+
nodesInfo, err = c.scheduler.GlobalDivision(nodesInfo, opts.Count, total)
107109
default:
108110
return nil, types.ErrBadDeployMethod
109111
}

cluster/calcium/resource_test.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -153,35 +153,39 @@ func TestAllocResource(t *testing.T) {
153153
c.scheduler = sched
154154
// wrong select
155155
sched.On("SelectMemoryNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, types.ErrInsufficientMEM).Once()
156-
_, err = c.doAllocResource(ctx, opts, scheduler.MEMORY_PRIOR)
156+
_, err = c.doAllocResource(ctx, opts, scheduler.MemoryPrior)
157157
assert.Error(t, err)
158158
//cpu select
159159
total := 3
160160
sched.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nodesInfo, nodeCPUPlans, total, nil)
161161
// wrong DeployMethod
162-
_, err = c.doAllocResource(ctx, opts, scheduler.CPU_PRIOR)
162+
_, err = c.doAllocResource(ctx, opts, scheduler.CPUPrior)
163163
assert.Error(t, err)
164164
// other methods
165165
sched.On("CommonDivision", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrInsufficientRes)
166166
opts.DeployMethod = cluster.DeployAuto
167-
_, err = c.doAllocResource(ctx, opts, scheduler.CPU_PRIOR)
167+
_, err = c.doAllocResource(ctx, opts, scheduler.CPUPrior)
168+
assert.Error(t, err)
169+
sched.On("GlobalDivision", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrInsufficientRes)
170+
opts.DeployMethod = cluster.DeployGlobal
171+
_, err = c.doAllocResource(ctx, opts, scheduler.CPUPrior)
168172
assert.Error(t, err)
169173
sched.On("EachDivision", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrInsufficientRes)
170174
opts.DeployMethod = cluster.DeployEach
171-
_, err = c.doAllocResource(ctx, opts, scheduler.CPU_PRIOR)
175+
_, err = c.doAllocResource(ctx, opts, scheduler.CPUPrior)
172176
assert.Error(t, err)
173177
// fill division but no nodes failed
174178
sched.On("FillDivision", mock.Anything, mock.Anything, mock.Anything).Return([]types.NodeInfo{}, nil).Once()
175179
opts.DeployMethod = cluster.DeployFill
176-
_, err = c.doAllocResource(ctx, opts, scheduler.CPU_PRIOR)
180+
_, err = c.doAllocResource(ctx, opts, scheduler.CPUPrior)
177181
assert.Error(t, err)
178182
// fill division but UpdateNodeResource failed
179183
sched.On("FillDivision", mock.Anything, mock.Anything, mock.Anything).Return(nodesInfo, nil)
180184
store.On("UpdateNodeResource",
181185
mock.Anything, mock.Anything, mock.Anything,
182186
mock.Anything, mock.Anything, mock.Anything,
183187
).Return(types.ErrNoETCD).Once()
184-
_, err = c.doAllocResource(ctx, opts, scheduler.CPU_PRIOR)
188+
_, err = c.doAllocResource(ctx, opts, scheduler.CPUPrior)
185189
assert.Error(t, err)
186190
// fill division sucessed
187191
store.On("UpdateNodeResource",
@@ -192,13 +196,13 @@ func TestAllocResource(t *testing.T) {
192196
store.On("SaveProcessing",
193197
mock.Anything, mock.Anything, mock.Anything,
194198
).Return(types.ErrNoETCD).Once()
195-
_, err = c.doAllocResource(ctx, opts, scheduler.CPU_PRIOR)
199+
_, err = c.doAllocResource(ctx, opts, scheduler.CPUPrior)
196200
assert.Error(t, err)
197201
// bind process
198202
store.On("SaveProcessing",
199203
mock.Anything, mock.Anything, mock.Anything,
200204
).Return(nil)
201-
nsi, err := c.doAllocResource(ctx, opts, scheduler.CPU_PRIOR)
205+
nsi, err := c.doAllocResource(ctx, opts, scheduler.CPUPrior)
202206
assert.NoError(t, err)
203207
assert.Len(t, nsi, 1)
204208
assert.Equal(t, nsi[0].Name, n2)

cluster/cluster.go

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const (
2424
DeployEach = "each"
2525
// DeployFill for fill node plan
2626
DeployFill = "fill"
27+
// DeployGlobal for global node resource plan
28+
DeployGlobal = "global"
2729
// ERUMark mark container controlled by eru
2830
ERUMark = "ERU"
2931
// ERUMeta store publish and health things

scheduler/complex/global.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package complexscheduler
2+
3+
import (
4+
"sort"
5+
6+
"github.com/projecteru2/core/types"
7+
"github.com/projecteru2/core/utils"
8+
log "github.com/sirupsen/logrus"
9+
)
10+
11+
// GlobalDivisionPlan 基于全局资源配额
12+
func GlobalDivisionPlan(arg []types.NodeInfo, need int) ([]types.NodeInfo, error) {
13+
sort.Slice(arg, func(i, j int) bool { return arg[i].CPUUsage+arg[i].MemUsage < arg[j].CPUUsage+arg[j].MemUsage })
14+
length := len(arg)
15+
i := 0
16+
17+
var deploy int
18+
var delta float64
19+
for need > 0 {
20+
p := i
21+
deploy = 0
22+
delta = 0.0
23+
if i < length-1 {
24+
delta = utils.Round(arg[i+1].CPUUsage + arg[i+1].MemUsage - arg[i].CPUUsage - arg[i].MemUsage)
25+
i++
26+
}
27+
for j := 0; j <= p && need > 0 && delta >= 0; j++ {
28+
// 减枝
29+
if arg[j].Capacity == 0 {
30+
continue
31+
}
32+
cost := utils.Round(arg[j].CPURate + arg[j].MemRate)
33+
deploy = int(delta / cost)
34+
if deploy == 0 {
35+
deploy = 1
36+
}
37+
if deploy > arg[j].Capacity {
38+
deploy = arg[j].Capacity
39+
}
40+
if deploy > need {
41+
deploy = need
42+
}
43+
arg[j].Deploy += deploy
44+
arg[j].Capacity -= deploy
45+
need -= deploy
46+
}
47+
}
48+
// 这里 need 一定会为 0 出来,因为 volTotal 保证了一定大于 need
49+
// 这里并不需要再次排序了,理论上的排序是基于资源使用率得到的 Deploy 最终方案
50+
log.Debugf("[GlobalDivisionPlan] nodesInfo: %v", arg)
51+
return arg, nil
52+
}

scheduler/complex/global_test.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package complexscheduler
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
8+
"github.com/projecteru2/core/types"
9+
)
10+
11+
func TestGlobalDivisionPlan1(t *testing.T) {
12+
n1 := types.NodeInfo{
13+
Name: "n1",
14+
CPUUsage: 0.1,
15+
MemUsage: 0.7,
16+
CPURate: 0.03,
17+
MemRate: 0.02,
18+
Capacity: 1,
19+
}
20+
n2 := types.NodeInfo{
21+
Name: "n2",
22+
CPUUsage: 0.2,
23+
MemUsage: 0.3,
24+
CPURate: 0.04,
25+
MemRate: 0.07,
26+
Capacity: 1,
27+
}
28+
n3 := types.NodeInfo{
29+
Name: "n3",
30+
CPUUsage: 1.3,
31+
MemUsage: 0.9,
32+
CPURate: 0.01,
33+
MemRate: 0.04,
34+
Capacity: 1,
35+
}
36+
arg := []types.NodeInfo{n3, n2, n1}
37+
r, err := GlobalDivisionPlan(arg, 3)
38+
assert.NoError(t, err)
39+
assert.Equal(t, r[0].Deploy, 1)
40+
}
41+
42+
func TestGlobalDivisionPlan2(t *testing.T) {
43+
n1 := types.NodeInfo{
44+
Name: "n1",
45+
CPUUsage: 0.9,
46+
MemUsage: 0.7,
47+
CPURate: 0.03,
48+
MemRate: 0.02,
49+
Capacity: 100,
50+
}
51+
n2 := types.NodeInfo{
52+
Name: "n2",
53+
CPUUsage: 0.2,
54+
MemUsage: 0.3,
55+
CPURate: 0.04,
56+
MemRate: 0.07,
57+
Capacity: 100,
58+
}
59+
arg := []types.NodeInfo{n2, n1}
60+
r, err := GlobalDivisionPlan(arg, 2)
61+
assert.NoError(t, err)
62+
assert.Equal(t, r[0].Deploy, 2)
63+
}

scheduler/complex/potassium.go

+11
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,14 @@ func (m *Potassium) EachDivision(nodesInfo []types.NodeInfo, need, limit int) ([
110110
func (m *Potassium) FillDivision(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error) {
111111
return FillPlan(nodesInfo, need, limit)
112112
}
113+
114+
// GlobalDivision deploy containers by their resource costs
115+
// 尽量使得资源消耗平均
116+
// need 是所需总量,total 是支持部署总量
117+
func (m *Potassium) GlobalDivision(nodesInfo []types.NodeInfo, need, total int) ([]types.NodeInfo, error) {
118+
if total < need {
119+
return nil, types.NewDetailedErr(types.ErrInsufficientRes,
120+
fmt.Sprintf("need: %d, vol: %d", need, total))
121+
}
122+
return GlobalDivisionPlan(nodesInfo, need)
123+
}

scheduler/complex/potassium_test.go

+20
Original file line numberDiff line numberDiff line change
@@ -932,3 +932,23 @@ func TestMaxIdleNode(t *testing.T) {
932932
assert.NoError(t, err)
933933
assert.Equal(t, node.Name, n2.Name)
934934
}
935+
936+
func TestGlobalDivision(t *testing.T) {
937+
k, _ := newPotassium()
938+
_, err := k.GlobalDivision([]types.NodeInfo{}, 10, 1)
939+
assert.Error(t, err)
940+
nodeInfo := types.NodeInfo{
941+
Name: "n1",
942+
CPUUsage: 0.7,
943+
MemUsage: 0.3,
944+
CPURate: 0.1,
945+
MemRate: 0.2,
946+
Capacity: 100,
947+
Count: 21,
948+
Deploy: 0,
949+
}
950+
r, err := k.GlobalDivision([]types.NodeInfo{nodeInfo}, 10, 100)
951+
assert.NoError(t, err)
952+
assert.Equal(t, r[0].Deploy, 10)
953+
fmt.Println(r)
954+
}

scheduler/mocks/Scheduler.go

+23
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scheduler/scheduler.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package scheduler
33
import "github.com/projecteru2/core/types"
44

55
const (
6-
// CPU_PRIOR define cpu select
7-
CPU_PRIOR = "CPU"
8-
// MEMORY_PRIOR define mem select
9-
MEMORY_PRIOR = "MEM"
6+
// CPUPrior define cpu select
7+
CPUPrior = "CPU"
8+
// MemoryPrior define mem select
9+
MemoryPrior = "MEM"
1010
)
1111

1212
// Scheduler is a scheduler is used to determine which nodes are we gonna use.
@@ -26,4 +26,6 @@ type Scheduler interface {
2626
EachDivision(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error)
2727
// fill division
2828
FillDivision(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error)
29+
// global division
30+
GlobalDivision(nodesInfo []types.NodeInfo, need, total int) ([]types.NodeInfo, error)
2931
}

store/etcdv3/pod.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ func (m *Mercury) AddPod(ctx context.Context, name, favor, desc string) (*types.
1818
key := fmt.Sprintf(podInfoKey, name)
1919
favor = strings.ToUpper(favor)
2020
if favor == "" {
21-
favor = scheduler.MEMORY_PRIOR
22-
} else if favor != scheduler.MEMORY_PRIOR && favor != scheduler.CPU_PRIOR {
21+
favor = scheduler.MemoryPrior
22+
} else if favor != scheduler.MemoryPrior && favor != scheduler.CPUPrior {
2323
return nil, types.NewDetailedErr(types.ErrBadFaver,
2424
fmt.Sprintf("got bad faver: %s", favor))
2525
}

types/node.go

+2
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ type NodeInfo struct {
102102
MemCap int64
103103
CPUUsage float64 // CPU目前占用率
104104
MemUsage float64 // MEM目前占用率
105+
CPURate float64 // 需要增加的 CPU 占用率
106+
MemRate float64 // 需要增加的内存占有率
105107

106108
CPUPlan []CPUMap
107109
Capacity int // 可以部署几个

0 commit comments

Comments
 (0)