1
- package calcium
1
+ package selfmon
2
2
3
3
import (
4
4
"context"
5
5
"math/rand"
6
- "os/signal"
7
- "syscall"
6
+ "testing"
8
7
"time"
9
8
10
9
"github.com/pkg/errors"
11
10
11
+ "github.com/projecteru2/core/cluster"
12
12
"github.com/projecteru2/core/log"
13
- coretypes "github.com/projecteru2/core/types"
13
+ "github.com/projecteru2/core/store"
14
+ "github.com/projecteru2/core/types"
14
15
"github.com/projecteru2/core/utils"
15
16
)
16
17
@@ -19,22 +20,32 @@ const ActiveKey = "/selfmon/active"
19
20
20
21
// NodeStatusWatcher monitors the changes of node status
21
22
type NodeStatusWatcher struct {
22
- id int64
23
- cal * Calcium
23
+ id int64
24
+ config types.Config
25
+ cluster cluster.Cluster
26
+ store store.Store
24
27
}
25
28
26
- // NewNodeStatusWatcher .
27
- func NewNodeStatusWatcher ( cal * Calcium ) * NodeStatusWatcher {
29
+ // RunNodeStatusWatcher .
30
+ func RunNodeStatusWatcher ( ctx context. Context , config types. Config , cluster cluster. Cluster , t * testing. T ) {
28
31
rand .Seed (time .Now ().UnixNano ())
29
32
id := rand .Int63n (10000 ) // nolint
30
- return & NodeStatusWatcher {
31
- id : id ,
32
- cal : cal ,
33
+ store , err := store .NewStore (config , t )
34
+ if err != nil {
35
+ log .Errorf (context .TODO (), "[RunNodeStatusWatcher] %v failed to create store, err: %v" , id , err )
36
+ return
37
+ }
38
+
39
+ watcher := & NodeStatusWatcher {
40
+ id : id ,
41
+ config : config ,
42
+ store : store ,
43
+ cluster : cluster ,
33
44
}
45
+ watcher .run (ctx )
34
46
}
35
47
36
- func (n * NodeStatusWatcher ) run () {
37
- ctx := n .getSignalContext (context .TODO ())
48
+ func (n * NodeStatusWatcher ) run (ctx context.Context ) {
38
49
for {
39
50
select {
40
51
case <- ctx .Done ():
@@ -45,22 +56,11 @@ func (n *NodeStatusWatcher) run() {
45
56
log .Errorf (ctx , "[NodeStatusWatcher] %v stops watching, err: %v" , n .id , err )
46
57
}
47
58
})
48
- time .Sleep (n .cal . config .ConnectionTimeout )
59
+ time .Sleep (n .config .ConnectionTimeout )
49
60
}
50
61
}
51
62
}
52
63
53
- func (n * NodeStatusWatcher ) getSignalContext (ctx context.Context ) context.Context {
54
- exitCtx , cancel := signal .NotifyContext (ctx , syscall .SIGINT , syscall .SIGTERM , syscall .SIGQUIT )
55
- go func () {
56
- defer cancel ()
57
- <- exitCtx .Done ()
58
- log .Warnf (ctx , "[NodeStatusWatcher] watcher %v receives a signal to exit" , n .id )
59
- }()
60
-
61
- return exitCtx
62
- }
63
-
64
64
// withActiveLock acquires the active lock synchronously
65
65
func (n * NodeStatusWatcher ) withActiveLock (parentCtx context.Context , f func (ctx context.Context )) {
66
66
ctx , cancel := context .WithCancel (parentCtx )
@@ -90,7 +90,7 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
90
90
if errors .Is (err , context .Canceled ) {
91
91
log .Info ("[Register] context canceled" )
92
92
return
93
- } else if ! errors .Is (err , coretypes .ErrKeyExists ) {
93
+ } else if ! errors .Is (err , types .ErrKeyExists ) {
94
94
log .Errorf (ctx , "[Register] failed to re-register: %v" , err )
95
95
time .Sleep (time .Second )
96
96
continue
@@ -126,20 +126,20 @@ func (n *NodeStatusWatcher) withActiveLock(parentCtx context.Context, f func(ctx
126
126
}
127
127
128
128
func (n * NodeStatusWatcher ) register (ctx context.Context ) (<- chan struct {}, func (), error ) {
129
- return n .cal . store .StartEphemeral (ctx , ActiveKey , n . cal .config .HAKeepaliveInterval )
129
+ return n .store .StartEphemeral (ctx , ActiveKey , n .config .HAKeepaliveInterval )
130
130
}
131
131
132
132
func (n * NodeStatusWatcher ) initNodeStatus (ctx context.Context ) {
133
133
log .Debug (ctx , "[NodeStatusWatcher] init node status started" )
134
- nodes := make (chan * coretypes .Node )
134
+ nodes := make (chan * types .Node )
135
135
136
136
go func () {
137
137
defer close (nodes )
138
138
// Get all nodes which are active status, and regardless of pod.
139
139
var err error
140
- var ch <- chan * coretypes .Node
141
- utils .WithTimeout (ctx , n .cal . config .GlobalTimeout , func (ctx context.Context ) {
142
- ch , err = n .cal .ListPodNodes (ctx , & coretypes .ListNodesOptions {
140
+ var ch <- chan * types .Node
141
+ utils .WithTimeout (ctx , n .config .GlobalTimeout , func (ctx context.Context ) {
142
+ ch , err = n .cluster .ListPodNodes (ctx , & types .ListNodesOptions {
143
143
Podname : "" ,
144
144
Labels : nil ,
145
145
All : true ,
@@ -161,9 +161,9 @@ func (n *NodeStatusWatcher) initNodeStatus(ctx context.Context) {
161
161
}()
162
162
163
163
for node := range nodes {
164
- status , err := n .cal .GetNodeStatus (ctx , node .Name )
164
+ status , err := n .cluster .GetNodeStatus (ctx , node .Name )
165
165
if err != nil {
166
- status = & coretypes .NodeStatus {
166
+ status = & types .NodeStatus {
167
167
Nodename : node .Name ,
168
168
Podname : node .Podname ,
169
169
Alive : false ,
@@ -178,15 +178,15 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
178
178
go n .initNodeStatus (ctx )
179
179
180
180
// monitor node status
181
- messageChan := n .cal .NodeStatusStream (ctx )
181
+ messageChan := n .cluster .NodeStatusStream (ctx )
182
182
log .Infof (ctx , "[NodeStatusWatcher] %v watch node status started" , n .id )
183
183
defer log .Infof (ctx , "[NodeStatusWatcher] %v stop watching node status" , n .id )
184
184
185
185
for {
186
186
select {
187
187
case message , ok := <- messageChan :
188
188
if ! ok {
189
- return coretypes .ErrMessageChanClosed
189
+ return types .ErrMessageChanClosed
190
190
}
191
191
go n .dealNodeStatusMessage (ctx , message )
192
192
case <- ctx .Done ():
@@ -195,18 +195,18 @@ func (n *NodeStatusWatcher) monitor(ctx context.Context) error {
195
195
}
196
196
}
197
197
198
- func (n * NodeStatusWatcher ) dealNodeStatusMessage (ctx context.Context , message * coretypes .NodeStatus ) {
198
+ func (n * NodeStatusWatcher ) dealNodeStatusMessage (ctx context.Context , message * types .NodeStatus ) {
199
199
if message .Error != nil {
200
200
log .Errorf (ctx , "[NodeStatusWatcher] deal with node status stream message failed %+v" , message )
201
201
return
202
202
}
203
203
204
204
// TODO maybe we need a distributed lock to control concurrency
205
- opts := & coretypes .SetNodeOptions {
205
+ opts := & types .SetNodeOptions {
206
206
Nodename : message .Nodename ,
207
207
WorkloadsDown : ! message .Alive ,
208
208
}
209
- if _ , err := n .cal .SetNode (ctx , opts ); err != nil {
209
+ if _ , err := n .cluster .SetNode (ctx , opts ); err != nil {
210
210
log .Errorf (ctx , "[NodeStatusWatcher] set node %s failed %v" , message .Nodename , err )
211
211
return
212
212
}
0 commit comments