1
- package calcium
1
+ package selfmon
2
2
3
3
import (
4
4
"context"
5
5
"math/rand"
6
6
"os/signal"
7
7
"syscall"
8
+ "testing"
8
9
"time"
9
10
10
11
"github.com/pkg/errors"
11
12
13
+ "github.com/projecteru2/core/cluster"
12
14
"github.com/projecteru2/core/log"
13
- coretypes "github.com/projecteru2/core/types"
15
+ "github.com/projecteru2/core/store"
16
+ "github.com/projecteru2/core/types"
14
17
"github.com/projecteru2/core/utils"
15
18
)
16
19
@@ -19,22 +22,32 @@ const ActiveKey = "/selfmon/active"
19
22
20
23
// NodeStatusWatcher monitors the changes of node status
21
24
type NodeStatusWatcher struct {
22
- id int64
23
- cal * Calcium
25
+ id int64
26
+ config types.Config
27
+ cluster cluster.Cluster
28
+ store store.Store
24
29
}
25
30
26
- // NewNodeStatusWatcher .
27
- func NewNodeStatusWatcher ( cal * Calcium ) * NodeStatusWatcher {
31
+ // RunNodeStatusWatcher .
32
+ func RunNodeStatusWatcher ( ctx context. Context , config types. Config , cluster cluster. Cluster , t * testing. T ) {
28
33
rand .Seed (time .Now ().UnixNano ())
29
34
id := rand .Int63n (10000 ) // nolint
30
- return & NodeStatusWatcher {
31
- id : id ,
32
- cal : cal ,
35
+ store , err := store .NewStore (config , t )
36
+ if err != nil {
37
+ log .Errorf (context .TODO (), "[RunNodeStatusWatcher] %v failed to create store, err: %v" , id , err )
38
+ return
39
+ }
40
+
41
+ watcher := & NodeStatusWatcher {
42
+ id : id ,
43
+ config : config ,
44
+ store : store ,
45
+ cluster : cluster ,
33
46
}
47
+ watcher .run (ctx )
34
48
}
35
49
36
- func (n * NodeStatusWatcher ) run () {
37
- ctx := n .getSignalContext (context .TODO ())
50
+ func (n * NodeStatusWatcher ) run (ctx context.Context ) {
38
51
for {
39
52
select {
40
53
case <- ctx .Done ():
@@ -45,7 +58,7 @@ func (n *NodeStatusWatcher) run() {
45
58
log .Errorf (ctx , "[NodeStatusWatcher] %v stops watching, err: %v" , n .id , err )
46
59
}
47
60
})
48
- time .Sleep (n .cal . config .ConnectionTimeout )
61
+ time .Sleep (n .config .ConnectionTimeout )
49
62
}
50
63
}
51
64
}
@@ -90,7 +103,7 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
90
103
if errors .Is (err , context .Canceled ) {
91
104
log .Info ("[Register] context canceled" )
92
105
return
93
- } else if ! errors .Is (err , coretypes .ErrKeyExists ) {
106
+ } else if ! errors .Is (err , types .ErrKeyExists ) {
94
107
log .Errorf (ctx , "[Register] failed to re-register: %v" , err )
95
108
time .Sleep (time .Second )
96
109
continue
@@ -126,20 +139,20 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
126
139
}
127
140
128
141
func (n * NodeStatusWatcher ) register (ctx context.Context ) (<- chan struct {}, func (), error ) {
129
- return n .cal . store .StartEphemeral (ctx , ActiveKey , n . cal .config .HAKeepaliveInterval )
142
+ return n .store .StartEphemeral (ctx , ActiveKey , n .config .HAKeepaliveInterval )
130
143
}
131
144
132
145
func (n * NodeStatusWatcher ) initNodeStatus (ctx context.Context ) {
133
146
log .Debug (ctx , "[NodeStatusWatcher] init node status started" )
134
- nodes := make (chan * coretypes .Node )
147
+ nodes := make (chan * types .Node )
135
148
136
149
go func () {
137
150
defer close (nodes )
138
151
// Get all nodes which are active status, and regardless of pod.
139
152
var err error
140
- var ch <- chan * coretypes .Node
141
- utils .WithTimeout (ctx , n .cal . config .GlobalTimeout , func (ctx context.Context ) {
142
- ch , err = n .cal .ListPodNodes (ctx , & coretypes .ListNodesOptions {
153
+ var ch <- chan * types .Node
154
+ utils .WithTimeout (ctx , n .config .GlobalTimeout , func (ctx context.Context ) {
155
+ ch , err = n .cluster .ListPodNodes (ctx , & types .ListNodesOptions {
143
156
Podname : "" ,
144
157
Labels : nil ,
145
158
All : true ,
@@ -161,9 +174,9 @@ func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
161
174
}()
162
175
163
176
for node := range nodes {
164
- status , err := n .cal .GetNodeStatus (ctx , node .Name )
177
+ status , err := n .cluster .GetNodeStatus (ctx , node .Name )
165
178
if err != nil {
166
- status = & coretypes .NodeStatus {
179
+ status = & types .NodeStatus {
167
180
Nodename : node .Name ,
168
181
Podname : node .Podname ,
169
182
Alive : false ,
@@ -178,15 +191,15 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
178
191
go n .initNodeStatus (ctx )
179
192
180
193
// monitor node status
181
- messageChan := n .cal .NodeStatusStream (ctx )
194
+ messageChan := n .cluster .NodeStatusStream (ctx )
182
195
log .Infof (ctx , "[NodeStatusWatcher] %v watch node status started" , n .id )
183
196
defer log .Infof (ctx , "[NodeStatusWatcher] %v stop watching node status" , n .id )
184
197
185
198
for {
186
199
select {
187
200
case message , ok := <- messageChan :
188
201
if ! ok {
189
- return coretypes .ErrMessageChanClosed
202
+ return types .ErrMessageChanClosed
190
203
}
191
204
go n .dealNodeStatusMessage (ctx , message )
192
205
case <- ctx .Done ():
@@ -195,18 +208,18 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
195
208
}
196
209
}
197
210
198
- func (n * NodeStatusWatcher ) dealNodeStatusMessage (ctx context.Context , message * coretypes .NodeStatus ) {
211
+ func (n * NodeStatusWatcher ) dealNodeStatusMessage (ctx context.Context , message * types .NodeStatus ) {
199
212
if message .Error != nil {
200
213
log .Errorf (ctx , "[NodeStatusWatcher] deal with node status stream message failed %+v" , message )
201
214
return
202
215
}
203
216
204
217
// TODO maybe we need a distributed lock to control concurrency
205
- opts := & coretypes .SetNodeOptions {
218
+ opts := & types .SetNodeOptions {
206
219
Nodename : message .Nodename ,
207
220
WorkloadsDown : ! message .Alive ,
208
221
}
209
- if _ , err := n .cal .SetNode (ctx , opts ); err != nil {
222
+ if _ , err := n .cluster .SetNode (ctx , opts ); err != nil {
210
223
log .Errorf (ctx , "[NodeStatusWatcher] set node %s failed %v" , message .Nodename , err )
211
224
return
212
225
}
0 commit comments