Skip to content

Commit 753aa93

Browse files
New Communisim Strategy (#313)
* new communisim: heap based * eliminate strategy side effects * add more tests for communism
1 parent 9a9165c commit 753aa93

9 files changed

+194
-117
lines changed

scheduler/complex/potassium_test.go

+24-23
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package complexscheduler
33
import (
44
"errors"
55
"fmt"
6+
"sort"
67
"testing"
78

89
"math"
@@ -1011,8 +1012,10 @@ func TestSelectMemoryNodesSequence(t *testing.T) {
10111012
refreshPod(res, deployMap, mem, 0)
10121013
res, deployMap, err = SelectMemoryNodes(k, res, nil, cpu, mem, 5, false)
10131014
assert.NoError(t, err)
1014-
assert.Equal(t, deployMap[res[0].Name], 3)
1015-
assert.Equal(t, deployMap[res[1].Name], 2)
1015+
finalCounts := []int{deployMap[res[0].Name], deployMap[res[1].Name]}
1016+
sort.Ints(finalCounts)
1017+
assert.ElementsMatch(t, []int{2, 3}, finalCounts)
1018+
10161019
}
10171020

10181021
func TestSelectMemoryNodesGiven(t *testing.T) {
@@ -1024,15 +1027,15 @@ func TestSelectMemoryNodesGiven(t *testing.T) {
10241027
}
10251028

10261029
k, _ := newPotassium()
1027-
res, deployMap, err := SelectMemoryNodes(k, pod, countMap, 1.0, 512*int64(units.MiB), 2, false)
1030+
_, deployMap, err := SelectMemoryNodes(k, pod, countMap, 1.0, 512*int64(units.MiB), 2, false)
10281031
assert.NoError(t, err)
1029-
for _, node := range res {
1030-
if node.Name == "n3" {
1031-
assert.Equal(t, deployMap[node.Name], 2)
1032-
continue
1033-
}
1034-
assert.Equal(t, deployMap[node.Name], 0)
1032+
finalCounts := []int{}
1033+
for _, node := range pod {
1034+
finalCounts = append(finalCounts, countMap[node.Name]+deployMap[node.Name])
10351035
}
1036+
sort.Ints(finalCounts)
1037+
assert.ElementsMatch(t, []int{1, 1, 1, 2}, finalCounts)
1038+
10361039
}
10371040

10381041
func TestMaxIdleNode(t *testing.T) {
@@ -1219,10 +1222,11 @@ func TestSelectStorageNodesSequence(t *testing.T) {
12191222
res, deployMap, err := SelectStorageNodes(k, scheduleInfos, nil, stor, 1, false)
12201223
assert.NoError(t, err)
12211224
assert.Equal(t, 2, len(res))
1222-
assert.Equal(t, 1, deployMap[res[0].Name])
12231225
assert.Equal(t, 1, res[0].Capacity)
1224-
assert.Equal(t, 0, deployMap[res[1].Name])
12251226
assert.Equal(t, 2, res[1].Capacity)
1227+
counts := []int{deployMap[scheduleInfos[0].Name], deployMap[scheduleInfos[1].Name]}
1228+
sort.Ints(counts)
1229+
assert.ElementsMatch(t, []int{0, 1}, counts)
12261230

12271231
refreshPod(res, deployMap, mem, stor)
12281232
countMap := map[string]int{
@@ -1251,10 +1255,14 @@ func TestSelectStorageNodesSequence(t *testing.T) {
12511255
return
12521256
}
12531257
i, j := getLess(res)
1254-
assert.Equal(t, 0, deployMap[res[i].Name])
1255-
assert.Equal(t, 2, deployMap[res[j].Name])
1256-
assert.Equal(t, 1, res[i].Capacity)
1257-
assert.Equal(t, 0, res[j].Capacity)
1258+
getFinalCounts := func(deployMap, countMap map[string]int) (counts []int) {
1259+
for name, d := range deployMap {
1260+
counts = append(counts, d+countMap[name])
1261+
}
1262+
sort.Ints(counts)
1263+
return
1264+
}
1265+
assert.ElementsMatch(t, []int{1, 2}, getFinalCounts(deployMap, countMap))
12581266

12591267
refreshPod(res, deployMap, mem, stor)
12601268
countMap = map[string]int{
@@ -1268,13 +1276,6 @@ func TestSelectStorageNodesSequence(t *testing.T) {
12681276
assert.Equal(t, 2, len(res))
12691277
assert.Equal(t, 14, res[0].Capacity)
12701278
assert.Equal(t, 15, res[1].Capacity)
1271-
1272-
res, deployMap, err = SelectStorageNodes(k, res, countMap, int64(units.GiB), 1, false)
1273-
assert.NoError(t, err)
1274-
i, j = getLess(res)
1275-
assert.Equal(t, 2, len(res))
1276-
assert.Equal(t, 1, deployMap[res[i].Name])
1277-
assert.Equal(t, 0, res[i].Capacity)
12781279
}
12791280

12801281
func SelectStorageNodes(k *Potassium, scheduleInfos []resourcetypes.ScheduleInfo, countMap map[string]int, storage int64, need int, each bool) ([]resourcetypes.ScheduleInfo, map[string]int, error) {
@@ -1295,7 +1296,7 @@ func SelectStorageNodes(k *Potassium, scheduleInfos []resourcetypes.ScheduleInfo
12951296
for i, scheduleInfo := range scheduleInfos {
12961297
for _, si := range strategyInfos {
12971298
if si.Nodename == scheduleInfo.Name {
1298-
scheduleInfos[i].Capacity = si.Capacity
1299+
scheduleInfos[i].Capacity = si.Capacity - deployMap[si.Nodename]
12991300
}
13001301
}
13011302
}

strategy/average.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ import (
1212
// AveragePlan deploy workload each node
1313
// 容量够的机器每一台部署 N 个
1414
// need 是每台机器所需总量,limit 是限制节点数
15-
func AveragePlan(strategyInfos []Info, need, total, limit int, resourceType types.ResourceType) (map[string]int, error) {
15+
func AveragePlan(infos []Info, need, total, limit int, resourceType types.ResourceType) (map[string]int, error) {
1616
log.Debugf("[AveragePlan] need %d limit %d", need, limit)
17-
scheduleInfosLength := len(strategyInfos)
17+
scheduleInfosLength := len(infos)
1818
if scheduleInfosLength < limit {
1919
return nil, errors.WithStack(types.NewDetailedErr(types.ErrInsufficientRes,
2020
fmt.Sprintf("node len %d < limit, cannot alloc an average node plan", scheduleInfosLength)))
2121
}
22+
strategyInfos := make([]Info, scheduleInfosLength)
23+
copy(strategyInfos, infos)
2224
sort.Slice(strategyInfos, func(i, j int) bool { return strategyInfos[i].Capacity < strategyInfos[j].Capacity })
2325
p := sort.Search(scheduleInfosLength, func(i int) bool { return strategyInfos[i].Capacity >= need })
2426
if p == scheduleInfosLength {

strategy/average_test.go

+7-13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package strategy
22

33
import (
4+
"sort"
45
"testing"
56

67
"github.com/projecteru2/core/types"
@@ -10,22 +11,15 @@ import (
1011
func TestAveragePlan(t *testing.T) {
1112
// 正常的
1213
nodes := deployedNodes()
13-
originCap := map[string]int{}
14-
for _, v := range nodes {
15-
originCap[v.Nodename] = v.Capacity
16-
}
1714
r, err := AveragePlan(nodes, 1, 0, 0, types.ResourceAll)
1815
assert.NoError(t, err)
19-
for i := range r {
20-
assert.Equal(t, r[i], 1)
21-
capacity := 0
22-
for _, si := range nodes {
23-
if si.Nodename == i {
24-
capacity = si.Capacity
25-
}
26-
}
27-
assert.Equal(t, capacity, originCap[i]-1)
16+
finalCounts := []int{}
17+
for _, node := range nodes {
18+
finalCounts = append(finalCounts, node.Count+r[node.Nodename])
2819
}
20+
sort.Ints(finalCounts)
21+
assert.ElementsMatch(t, []int{3, 4, 6, 8}, finalCounts)
22+
2923
// nodes len < limit
3024
nodes = deployedNodes()
3125
_, err = AveragePlan(nodes, 100, 0, 5, types.ResourceAll)

strategy/communism.go

+62-37
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,82 @@
11
package strategy
22

33
import (
4+
"container/heap"
45
"fmt"
5-
"sort"
66

77
"github.com/pkg/errors"
8-
"github.com/projecteru2/core/log"
98
"github.com/projecteru2/core/types"
109
)
1110

11+
type infoHeap struct {
12+
infos []Info
13+
limit int
14+
}
15+
16+
func (h infoHeap) Len() int {
17+
return len(h.infos)
18+
}
19+
20+
func (h infoHeap) Less(i, j int) bool {
21+
return h.infos[i].Count < h.infos[j].Count || (h.infos[i].Count == h.infos[j].Count && h.infos[i].Capacity > h.infos[j].Capacity)
22+
}
23+
24+
func (h infoHeap) Swap(i, j int) {
25+
h.infos[i], h.infos[j] = h.infos[j], h.infos[i]
26+
}
27+
28+
func (h *infoHeap) Push(x interface{}) {
29+
info := x.(Info)
30+
if info.Capacity == 0 || (h.limit > 0 && info.Count >= h.limit) {
31+
return
32+
}
33+
h.infos = append(h.infos, info)
34+
}
35+
36+
func (h *infoHeap) Pop() interface{} {
37+
length := len(h.infos)
38+
x := h.infos[length-1]
39+
h.infos = h.infos[0 : length-1]
40+
return x
41+
}
42+
43+
func newInfoHeap(infos []Info, limit int) heap.Interface {
44+
dup := infoHeap{
45+
infos: []Info{},
46+
limit: limit,
47+
}
48+
for _, info := range infos {
49+
if info.Capacity == 0 || (limit > 0 && info.Count >= limit) {
50+
continue
51+
}
52+
dup.infos = append(dup.infos, info)
53+
}
54+
return &dup
55+
}
56+
1257
// CommunismPlan 吃我一记共产主义大锅饭
1358
// 部署完 N 个后全局尽可能平均
14-
func CommunismPlan(arg []Info, need, total, limit int, resourceType types.ResourceType) (map[string]int, error) {
59+
func CommunismPlan(infos []Info, need, total, limit int, resourceType types.ResourceType) (map[string]int, error) {
1560
if total < need {
1661
return nil, errors.WithStack(types.NewDetailedErr(types.ErrInsufficientRes,
1762
fmt.Sprintf("need: %d, vol: %d", need, total)))
1863
}
19-
sort.Slice(arg, func(i, j int) bool { return arg[i].Count < arg[j].Count })
20-
length := len(arg)
2164

22-
deployMap := map[string]int{}
23-
for i := 0; i < length; i++ {
24-
if need <= 0 {
25-
break
26-
}
27-
req := need
28-
if i < length-1 {
29-
req = (arg[i+1].Count - arg[i].Count) * (i + 1)
30-
}
31-
if req > need {
32-
req = need
65+
deploy := map[string]int{}
66+
iHeap := newInfoHeap(infos, limit)
67+
heap.Init(iHeap)
68+
for {
69+
if iHeap.Len() == 0 {
70+
return nil, errors.WithStack(types.ErrInsufficientRes)
3371
}
34-
for j := 0; j < i+1; j++ {
35-
deploy := req / (i + 1 - j)
36-
tail := req % (i + 1 - j)
37-
d := deploy
38-
if tail > 0 {
39-
d++
40-
}
41-
if d > arg[j].Capacity {
42-
d = arg[j].Capacity
43-
}
44-
if d == 0 {
45-
continue
46-
}
47-
arg[j].Capacity -= d
48-
deployMap[arg[j].Nodename] += d
49-
need -= d
50-
req -= d
72+
info := heap.Pop(iHeap).(Info)
73+
deploy[info.Nodename]++
74+
need--
75+
if need == 0 {
76+
return deploy, nil
5177
}
78+
info.Count++
79+
info.Capacity--
80+
heap.Push(iHeap, info)
5281
}
53-
// 这里 need 一定会为 0 出来,因为 volTotal 保证了一定大于 need
54-
// 这里并不需要再次排序了,理论上的排序是基于 Count 得到的 Deploy 最终方案
55-
log.Debugf("[CommunismPlan] strategyInfo: %+v", arg)
56-
return deployMap, nil
5782
}

0 commit comments

Comments
 (0)