Skip to content

Commit 32f8bb4

Browse files
author
zhangye
committed
Merge branch 'debug' into 'master'
add statsd Eru-core will send memcap of each node concerned before and after container deploying. @platform See merge request !36
2 parents fa098e3 + 9db7270 commit 32f8bb4

File tree

9 files changed

+235
-112
lines changed

9 files changed

+235
-112
lines changed

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.7.11
1+
0.7.12

cluster/calcium/create_container.go

+9
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func (c *calcium) createContainerWithCPUPeriod(specs types.Specs, opts *types.De
4343
log.Errorf("Got error %v after getCPUAndMem", err)
4444
return ch, err
4545
}
46+
go utils.SendMemCap(cpuandmem, "before_alloc")
4647
nodesInfo := utils.GetNodesInfo(cpuandmem)
4748

4849
cpuQuota := int(opts.CPUQuota * float64(utils.CpuPeriodBase))
@@ -188,6 +189,14 @@ func (c *calcium) doCreateContainerWithCPUPeriod(nodename string, connum int, qu
188189
}
189190

190191
}
192+
go func(podname string, nodename string) {
193+
cpuandmem, _, err := c.getCPUAndMem(podname, nodename, 1.0)
194+
if err != nil {
195+
log.Errorf("Got error %v after getCPUAndMem", err)
196+
return
197+
}
198+
utils.SendMemCap(cpuandmem, "after_alloc")
199+
}(opts.Podname, opts.Nodename)
191200
return ms
192201
}
193202

core.yaml.sample

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ etcd:
66
etcd_lock_prefix: "/eru-core/_lock"
77

88
resource_alloc: "cpu-period"
9+
statsd: "statsd2.ricebook.net:8125"
910

1011
git:
1112
public_key: "***REMOVED***"

g/statsd.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package g
2+
3+
import (
4+
"fmt"
5+
6+
statsdlib "github.com/CMGS/statsd"
7+
log "github.com/Sirupsen/logrus"
8+
)
9+
10+
type StatsDClient struct {
11+
Addr string
12+
}
13+
14+
func (self *StatsDClient) Close() error {
15+
return nil
16+
}
17+
18+
func (self *StatsDClient) Send(data map[string]float64, endpoint, tag string) error {
19+
remote, err := statsdlib.New(self.Addr)
20+
if err != nil {
21+
log.Errorf("Connect statsd failed: %v", err)
22+
return err
23+
}
24+
defer remote.Close()
25+
defer remote.Flush()
26+
for k, v := range data {
27+
key := fmt.Sprintf("eru-core.%s.%s.%s", endpoint, tag, k)
28+
remote.Gauge(key, v)
29+
}
30+
return nil
31+
}
32+
33+
var Statsd = StatsDClient{}
34+
35+
func NewStatsdClient(addr string) {
36+
Statsd = StatsDClient{addr}
37+
}

main.go

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
log "github.com/Sirupsen/logrus"
1414
"github.com/codegangsta/cli"
1515
"gitlab.ricebook.net/platform/core/cluster/calcium"
16+
"gitlab.ricebook.net/platform/core/g"
1617
"gitlab.ricebook.net/platform/core/rpc"
1718
"gitlab.ricebook.net/platform/core/rpc/gen"
1819
"gitlab.ricebook.net/platform/core/types"
@@ -84,6 +85,8 @@ func serve() {
8485
log.Fatal(err)
8586
}
8687

88+
g.NewStatsdClient(config.Statsd)
89+
8790
cluster, err := calcium.New(config)
8891
if err != nil {
8992
log.Fatal(err)

types/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ type Config struct {
88
EtcdMachines []string `yaml:"etcd"` // etcd cluster addresses
99
EtcdLockPrefix string `yaml:"etcd_lock_prefix"` // etcd lock prefix, all locks will be created under this dir
1010
ResourceAlloc string `yaml:"resource_alloc"` // scheduler or cpu-period TODO give it a good name
11+
Statsd string `yaml:"statsd"` // Statsd host and port
1112

1213
Git GitConfig `yaml:"git"`
1314
Docker DockerConfig `yaml:"docker"`

utils/alloc_plan.go

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package utils
2+
3+
import (
4+
log "github.com/Sirupsen/logrus"
5+
"gitlab.ricebook.net/platform/core/types"
6+
7+
"fmt"
8+
"sort"
9+
)
10+
11+
type NodeInfo struct {
12+
Name string
13+
CorePer int
14+
Memory int64
15+
}
16+
17+
type ByCoreNum []NodeInfo
18+
19+
func (a ByCoreNum) Len() int { return len(a) }
20+
func (a ByCoreNum) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
21+
func (a ByCoreNum) Less(i, j int) bool { return a[i].CorePer < a[j].CorePer }
22+
23+
func AllocContainerPlan(nodeInfo ByCoreNum, quota int, memory int64, count int) (map[string]int, error) {
24+
log.Debugf("[AllocContainerPlan]: nodeInfo: %v, quota: %d, memory: %d, count: %d", nodeInfo, quota, memory, count)
25+
26+
result := make(map[string]int)
27+
N := nodeInfo.Len()
28+
firstNodeWithEnoughCPU := -1
29+
30+
for i := 0; i < N; i++ {
31+
if nodeInfo[i].CorePer >= quota {
32+
firstNodeWithEnoughCPU = i
33+
break
34+
}
35+
}
36+
if firstNodeWithEnoughCPU == -1 {
37+
log.Errorf("[AllocContainerPlan] Cannot alloc a plan, not enough cpu quota, got flag %d", firstNodeWithEnoughCPU)
38+
return result, fmt.Errorf("[AllocContainerPlan] Cannot alloc a plan, not enough cpu quota")
39+
}
40+
log.Debugf("[AllocContainerPlan] the %d th node has enough cpu quota.", firstNodeWithEnoughCPU)
41+
42+
// 计算是否有足够的内存满足需求
43+
nodeInfoList := []NodeInfo{}
44+
volTotal := 0
45+
volEachNode := []int{} //为了排序
46+
for i := firstNodeWithEnoughCPU; i < N; i++ {
47+
temp := int(nodeInfo[i].Memory / memory)
48+
if temp > 0 {
49+
volTotal += temp
50+
nodeInfoList = append(nodeInfoList, nodeInfo[i])
51+
volEachNode = append(volEachNode, temp)
52+
}
53+
}
54+
if volTotal < count {
55+
log.Errorf("[AllocContainerPlan] Cannot alloc a plan, volume %d, count %d", volTotal, count)
56+
return result, fmt.Errorf("[AllocContainerPlan] Cannot alloc a plan, not enough memory.")
57+
}
58+
log.Debugf("[AlloContainerPlan] volumn of each node: %v", volEachNode)
59+
60+
sort.Ints(volEachNode)
61+
log.Debugf("[AllocContainerPlan] sorted volumn: %v", volEachNode)
62+
plan, err := allocAlgorithm(volEachNode, count)
63+
if err != nil {
64+
log.Errorf("[AllocContainerPlan] %v", err)
65+
return result, err
66+
}
67+
68+
for i, num := range plan {
69+
key := nodeInfoList[i].Name
70+
result[key] = num
71+
}
72+
log.Debugf("[AllocContainerPlan] allocAlgorithm result: %v", result)
73+
return result, nil
74+
}
75+
76+
func allocAlgorithm(info []int, need int) (map[int]int, error) {
77+
// 实际上,这就是精确分配时候的那个分配算法
78+
// 情景是相同的:我们知道每台机能否分配多少容器
79+
// 要求我们尽可能平均地分配
80+
// 算法的正确性我们之前确认了
81+
// 所以我抄了过来
82+
result := make(map[int]int)
83+
nnode := len(info)
84+
85+
var nodeToUse, more int
86+
for i := 0; i < nnode; i++ {
87+
nodeToUse = nnode - i
88+
ave := need / nodeToUse
89+
if ave > info[i] {
90+
ave = 1
91+
}
92+
for ; ave < info[i] && ave*nodeToUse < need; ave++ {
93+
}
94+
log.Debugf("[AllocContainerPlan] allocAlgorithm outer loop: %d, ave: %d, result: %v", i, ave, result)
95+
more = ave*nodeToUse - need
96+
for j := i; nodeToUse != 0; nodeToUse-- {
97+
if _, ok := result[j]; !ok {
98+
result[j] = ave
99+
} else {
100+
result[j] += ave
101+
}
102+
if more > 0 {
103+
// TODO : 这里应该要有一个随机策略但是我之前的随机策略是有问题的
104+
// cmgs 提供了一个思路: 如果当前需要分配的容器数量少于最
105+
// 小机器的容量的话,我们就直接随机分
106+
more--
107+
result[j]--
108+
} else if more < 0 {
109+
info[j] -= ave
110+
}
111+
j++
112+
log.Debugf("[AllocContainerPlan] allocAlgorithm inner loop: %d, ave: %d, result: %v", j, ave, result)
113+
}
114+
if more == 0 {
115+
break
116+
}
117+
need = -more
118+
}
119+
log.Debugf("[AllocContainerPlan] allocAlgorithm info %v, need %d, made plan: %v", info, need, result)
120+
for _, v := range result {
121+
if v < 0 {
122+
// result will not be nil at this situation. So I return nil instead of result
123+
return nil, fmt.Errorf("allocAlgorithm illegal alloc plan: %v ", result)
124+
}
125+
}
126+
return result, nil
127+
}
128+
129+
func GetNodesInfo(cpumemmap map[string]types.CPUAndMem) ByCoreNum {
130+
result := ByCoreNum{}
131+
for node, cpuandmem := range cpumemmap {
132+
result = append(result, NodeInfo{node, len(cpuandmem.CpuMap) * CpuPeriodBase, cpuandmem.MemCap})
133+
}
134+
sort.Sort(result)
135+
return result
136+
}

utils/utils.go

+14-111
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import (
55
"fmt"
66
"math/big"
77
"os"
8-
"sort"
98
"strings"
109

1110
log "github.com/Sirupsen/logrus"
1211
engineapi "github.com/docker/engine-api/client"
12+
"gitlab.ricebook.net/platform/core/g"
1313
"gitlab.ricebook.net/platform/core/types"
1414
"golang.org/x/net/context"
1515
)
@@ -97,116 +97,6 @@ func SaveFile(content, path string, mode os.FileMode) error {
9797
return err
9898
}
9999

100-
type NodeInfo struct {
101-
Name string
102-
CorePer int
103-
Memory int64
104-
}
105-
106-
type ByCoreNum []NodeInfo
107-
108-
func (a ByCoreNum) Len() int { return len(a) }
109-
func (a ByCoreNum) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
110-
func (a ByCoreNum) Less(i, j int) bool { return a[i].CorePer < a[j].CorePer }
111-
112-
func AllocContainerPlan(nodeInfo ByCoreNum, quota int, memory int64, count int) (map[string]int, error) {
113-
result := make(map[string]int)
114-
N := nodeInfo.Len()
115-
flag := -1
116-
117-
for i := 0; i < N; i++ {
118-
if nodeInfo[i].CorePer >= quota {
119-
flag = i
120-
break
121-
}
122-
}
123-
if flag == -1 {
124-
log.Errorf("Cannot alloc a plan, not enough cpu quota, got flat %d", flag)
125-
return result, fmt.Errorf("Cannot alloc a plan, not enough cpu quota")
126-
}
127-
128-
// 计算是否有足够的内存满足需求
129-
bucket := []NodeInfo{}
130-
volume := 0
131-
volNum := []int{} //为了排序
132-
for i := flag; i < N; i++ {
133-
temp := int(nodeInfo[i].Memory / memory)
134-
if temp > 0 {
135-
volume += temp
136-
bucket = append(bucket, nodeInfo[i])
137-
volNum = append(volNum, temp)
138-
}
139-
}
140-
if volume < count {
141-
log.Errorf("Cannot alloc a plan, volume %d, count %d", volume, count)
142-
return result, fmt.Errorf("Cannot alloc a plan, not enough memory.")
143-
}
144-
145-
sort.Ints(volNum)
146-
plan := allocAlgorithm(volNum, count)
147-
148-
for i, num := range plan {
149-
key := bucket[i].Name
150-
result[key] = num
151-
}
152-
log.Debugf("AllocContainerPlan: nodeInfo %v, quota %d, memory %d, count %d, made plan %v", nodeInfo, quota, memory, result)
153-
return result, nil
154-
}
155-
156-
func allocAlgorithm(info []int, need int) map[int]int {
157-
// 实际上,这就是精确分配时候的那个分配算法
158-
// 情景是相同的:我们知道每台机能否分配多少容器
159-
// 要求我们尽可能平均地分配
160-
// 算法的正确性我们之前确认了
161-
// 所以我抄了过来
162-
result := make(map[int]int)
163-
nnode := len(info)
164-
165-
var nodeToUse, more int
166-
for i := 0; i < nnode; i++ {
167-
nodeToUse = nnode - i
168-
ave := need / nodeToUse
169-
if ave > info[i] {
170-
ave = 1
171-
}
172-
for ; ave < info[i] && ave*nodeToUse < need; ave++ {
173-
}
174-
more = ave*nodeToUse - need
175-
for j := i; nodeToUse != 0; nodeToUse-- {
176-
if _, ok := result[j]; !ok {
177-
result[j] = ave
178-
} else {
179-
result[j] += ave
180-
}
181-
if more > 0 {
182-
// TODO : 这里应该要有一个随机策略但是我之前的随机策略是有问题的
183-
// cmgs 提供了一个思路: 如果当前需要分配的容器数量少于最
184-
// 小机器的容量的话,我们就直接随机分
185-
more--
186-
result[j]--
187-
} else if more < 0 {
188-
info[j] -= ave
189-
}
190-
j++
191-
}
192-
if more == 0 {
193-
break
194-
}
195-
need = -more
196-
}
197-
log.Debugf("allocAlgorithm: info %v, need %d, made plan %v", info, need, result)
198-
return result
199-
}
200-
201-
func GetNodesInfo(cpumap map[string]types.CPUAndMem) ByCoreNum {
202-
result := ByCoreNum{}
203-
for node, cpuandmem := range cpumap {
204-
result = append(result, NodeInfo{node, len(cpuandmem.CpuMap) * CpuPeriodBase, cpuandmem.MemCap})
205-
}
206-
sort.Sort(result)
207-
return result
208-
}
209-
210100
// copied from https://gist.github.com/jmervine/d88c75329f98e09f5c87
211101
func safeSplit(s string) []string {
212102
split := strings.Split(s, " ")
@@ -247,3 +137,16 @@ func MakeCommandLineArgs(s string) []string {
247137
}
248138
return r
249139
}
140+
141+
func SendMemCap(cpumemmap map[string]types.CPUAndMem, tag string) {
142+
data := map[string]float64{}
143+
for node, cpuandmem := range cpumemmap {
144+
data[node] = float64(cpuandmem.MemCap)
145+
}
146+
host := os.Getenv("HOSTNAME")
147+
err := g.Statsd.Send(data, host, tag)
148+
if err != nil {
149+
log.Errorf("Error occured while sending data to statsd: %v", err)
150+
}
151+
return
152+
}

0 commit comments

Comments
 (0)