Skip to content

Commit 3582022

Browse files
authored
when remove node, clean up engine cache & remove prometheus metrics (#615)
1 parent 9d7defb commit 3582022

File tree

3 files changed

+85
-1
lines changed

3 files changed

+85
-1
lines changed

cluster/calcium/node.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
enginefactory "github.com/projecteru2/core/engine/factory"
88
enginetypes "github.com/projecteru2/core/engine/types"
99
"github.com/projecteru2/core/log"
10+
"github.com/projecteru2/core/metrics"
1011
"github.com/projecteru2/core/resource/plugins"
1112
resourcetypes "github.com/projecteru2/core/resource/types"
1213
"github.com/projecteru2/core/types"
@@ -89,7 +90,12 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
8990
},
9091
// then: remove node resource metadata
9192
func(ctx context.Context) error {
92-
return c.rmgr.RemoveNode(ctx, nodename)
93+
if err = c.rmgr.RemoveNode(ctx, nodename); err != nil {
94+
return err
95+
}
96+
enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
97+
metrics.Client.RemoveInvalidNodes([]string{nodename})
98+
return nil
9399
},
94100
// rollback: do nothing
95101
func(ctx context.Context, failureByCond bool) error {

metrics/handler.go

+3
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler)
2020
if err != nil {
2121
logger.Error(ctx, err, "Get all nodes err")
2222
}
23+
activeNodes := make(map[string]*types.Node, 0)
2324
for node := range nodes {
2425
metrics, err := m.rmgr.GetNodeMetrics(ctx, node)
2526
if err != nil {
2627
logger.Error(ctx, err, "Get metrics failed")
2728
continue
2829
}
30+
activeNodes[node.Name] = node
2931
m.SendMetrics(ctx, metrics...)
3032
}
33+
m.DeleteInactiveNodesWithCache(ctx, activeNodes)
3134
h.ServeHTTP(w, r)
3235
})
3336
}

metrics/metrics.go

+75
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ import (
66
"strconv"
77
"sync"
88

9+
enginefactory "github.com/projecteru2/core/engine/factory"
910
"github.com/projecteru2/core/log"
1011
"github.com/projecteru2/core/resource"
1112
"github.com/projecteru2/core/resource/cobalt"
1213
plugintypes "github.com/projecteru2/core/resource/plugins/types"
1314
"github.com/projecteru2/core/types"
1415
"github.com/projecteru2/core/utils"
16+
io_prometheus_client "github.com/prometheus/client_model/go"
17+
"golang.org/x/exp/slices"
1518

1619
statsdlib "github.com/CMGS/statsd"
1720
"github.com/prometheus/client_golang/prometheus"
@@ -85,6 +88,78 @@ func (m *Metrics) SendMetrics(ctx context.Context, metrics ...*plugintypes.Metri
8588
}
8689
}
8790

91+
func (m *Metrics) DeleteInactiveNodesWithCache(ctx context.Context, activeNodesMap map[string]*types.Node) {
92+
metricNodeNameMap := m.getNodeNameMapFromMetrics()
93+
// 计算差集
94+
invalidNodes := make([]string, 0)
95+
for nodeName := range metricNodeNameMap {
96+
if node, exists := activeNodesMap[nodeName]; !exists {
97+
invalidNodes = append(invalidNodes, nodeName)
98+
enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
99+
}
100+
}
101+
m.RemoveInvalidNodes(invalidNodes)
102+
}
103+
104+
func (m *Metrics) getNodeNameMapFromMetrics() map[string]bool {
105+
metrics, _ := prometheus.DefaultGatherer.Gather()
106+
nodeNameMap := make(map[string]bool, 0)
107+
for _, metric := range metrics {
108+
for _, mf := range metric.GetMetric() {
109+
if len(mf.Label) == 0 {
110+
continue
111+
}
112+
for _, label := range mf.Label {
113+
if label.GetName() == "nodename" {
114+
nodeNameMap[label.GetValue()] = true
115+
break
116+
}
117+
}
118+
}
119+
}
120+
return nodeNameMap
121+
}
122+
123+
// RemoveInvalidNodes 清除多余的metric标签值
124+
func (m *Metrics) RemoveInvalidNodes(invalidNodes []string) {
125+
if len(invalidNodes) == 0 {
126+
return
127+
}
128+
for _, collector := range m.Collectors {
129+
if collector == nil {
130+
return
131+
}
132+
metrics, _ := prometheus.DefaultGatherer.Gather()
133+
for _, metric := range metrics {
134+
for _, mf := range metric.GetMetric() {
135+
if len(mf.Label) == 0 {
136+
continue
137+
}
138+
139+
if !slices.ContainsFunc(mf.Label, func(label *io_prometheus_client.LabelPair) bool {
140+
return label.GetName() == "nodename" && slices.ContainsFunc(invalidNodes, func(nodename string) bool {
141+
return label.GetValue() == nodename
142+
})
143+
}) {
144+
continue
145+
}
146+
labels := prometheus.Labels{}
147+
for _, label := range mf.Label {
148+
labels[label.GetName()] = label.GetValue()
149+
}
150+
// 删除符合条件的度量标签
151+
switch c := collector.(type) {
152+
case *prometheus.GaugeVec:
153+
c.Delete(labels)
154+
case *prometheus.CounterVec:
155+
c.Delete(labels)
156+
}
157+
}
158+
}
159+
// 添加更多的条件来处理其他类型的Collector
160+
}
161+
}
162+
88163
// Lazy connect
89164
func (m *Metrics) checkConn(ctx context.Context) error {
90165
if m.statsdClient != nil {

0 commit comments

Comments
 (0)