Skip to content

Commit 941cd6f

Browse files
committed
delete the node status if a node is down
1 parent 0be8dba commit 941cd6f

File tree

22 files changed

+73
-54
lines changed

22 files changed

+73
-54
lines changed

3rdmocks/ServerStream.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cluster/calcium/node.go

+7
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,15 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
7373
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
7474
if n.IsDown() {
7575
logger.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
76+
// remove node status
77+
err := c.store.SetNodeStatus(ctx, node, false, 0)
78+
if err != nil {
79+
// don't return here
80+
log.Errorf(ctx, "[SetNode] failed to set node status, err: %v", err)
81+
}
7682
}
7783
if opts.WorkloadsDown {
84+
// set all workloads as down
7885
workloads, err := c.store.ListNodeWorkloads(ctx, opts.Nodename, nil)
7986
if err != nil {
8087
return logger.Err(ctx, errors.WithStack(err))

cluster/calcium/node_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func TestSetNode(t *testing.T) {
141141
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
142142
lock.On("Unlock", mock.Anything).Return(nil)
143143

144+
store.On("SetNodeStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.ErrNoETCD)
144145
// fail by validating
145146
_, err := c.SetNode(ctx, &types.SetNodeOptions{Nodename: ""})
146147
assert.Error(t, err)
@@ -170,9 +171,7 @@ func TestSetNode(t *testing.T) {
170171
assert.Error(t, err)
171172
workloads := []*types.Workload{{Name: "wrong_name"}, {Name: "a_b_c"}}
172173
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(workloads, nil)
173-
store.On("SetWorkloadStatus",
174-
mock.Anything, mock.Anything, mock.Anything,
175-
).Return(types.ErrNoETCD)
174+
store.On("SetWorkloadStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
176175
_, err = c.SetNode(ctx, &types.SetNodeOptions{Nodename: "test", StatusOpt: 0, WorkloadsDown: true})
177176
assert.NoError(t, err)
178177
// test modify

cluster/calcium/status.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ func (c *Calcium) WorkloadStatusStream(ctx context.Context, appname, entrypoint,
6565

6666
// SetNodeStatus set status of a node
6767
// it's used to report whether a node is still alive
68-
func (c *Calcium) SetNodeStatus(ctx context.Context, nodename string, ttl int64) error {
68+
func (c *Calcium) SetNodeStatus(ctx context.Context, nodename string, alive bool, ttl int64) error {
6969
logger := log.WithField("Calcium", "SetNodeStatus").WithField("nodename", nodename).WithField("ttl", ttl)
7070
node, err := c.store.GetNode(ctx, nodename)
7171
if err != nil {
7272
return logger.Err(ctx, errors.WithStack(err))
7373
}
74-
return logger.Err(ctx, errors.WithStack(c.store.SetNodeStatus(ctx, node, ttl)))
74+
return logger.Err(ctx, errors.WithStack(c.store.SetNodeStatus(ctx, node, alive, ttl)))
7575
}
7676

7777
// GetNodeStatus set status of a node

cluster/calcium/status_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -96,22 +96,24 @@ func TestSetNodeStatus(t *testing.T) {
9696
}
9797
// failed
9898
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrBadCount).Once()
99-
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
99+
assert.Error(c.SetNodeStatus(ctx, node.Name, true, 10))
100100
store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil)
101101
// failed by SetWorkloadStatus
102102
store.On("SetNodeStatus",
103103
mock.Anything,
104104
mock.Anything,
105105
mock.Anything,
106+
mock.Anything,
106107
).Return(types.ErrBadCount).Once()
107-
assert.Error(c.SetNodeStatus(ctx, node.Name, 10))
108+
assert.Error(c.SetNodeStatus(ctx, node.Name, true, 10))
108109
// success
109110
store.On("SetNodeStatus",
110111
mock.Anything,
111112
mock.Anything,
112113
mock.Anything,
114+
mock.Anything,
113115
).Return(nil)
114-
assert.NoError(c.SetNodeStatus(ctx, node.Name, 10))
116+
assert.NoError(c.SetNodeStatus(ctx, node.Name, true, 10))
115117
}
116118

117119
func TestGetNodeStatus(t *testing.T) {

cluster/cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type Cluster interface {
5555
ListPodNodes(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
5656
GetNode(ctx context.Context, nodename string) (*types.Node, error)
5757
SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error)
58-
SetNodeStatus(ctx context.Context, nodename string, ttl int64) error
58+
SetNodeStatus(ctx context.Context, nodename string, alive bool, ttl int64) error
5959
GetNodeStatus(ctx context.Context, nodename string) (*types.NodeStatus, error)
6060
NodeStatusStream(ctx context.Context) chan *types.NodeStatus
6161
// node resource

cluster/mocks/Cluster.go

+6-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/docker/mocks/APIClient.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lock/mocks/DistributedLock.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/mocks/CoreRPC_RunAndWaitServer.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/rpc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func (v *Vibranium) SetNode(ctx context.Context, opts *pb.SetNodeOptions) (*pb.N
224224

225225
// SetNodeStatus set status of a node for reporting
226226
func (v *Vibranium) SetNodeStatus(ctx context.Context, opts *pb.SetNodeStatusOptions) (*pb.Empty, error) {
227-
if err := v.cluster.SetNodeStatus(ctx, opts.Nodename, opts.Ttl); err != nil {
227+
if err := v.cluster.SetNodeStatus(ctx, opts.Nodename, true, opts.Ttl); err != nil {
228228
return nil, grpcstatus.Error(SetNodeStatus, err.Error())
229229
}
230230
return &pb.Empty{}, nil

scheduler/mocks/Scheduler.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

source/mocks/Source.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

store/etcdv3/meta/mocks/ETCDClientV3.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

store/etcdv3/meta/mocks/KV.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

store/etcdv3/meta/mocks/Txn.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

store/etcdv3/node.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,16 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
292292
// SetNodeStatus sets status for a node, value will expire after ttl seconds
293293
// ttl should be larger than 0
294294
// this is heartbeat of node
295-
func (m *Mercury) SetNodeStatus(ctx context.Context, node *types.Node, ttl int64) error {
295+
func (m *Mercury) SetNodeStatus(ctx context.Context, node *types.Node, alive bool, ttl int64) error {
296+
// nodenames are unique
297+
statusKey := filepath.Join(nodeStatusPrefix, node.Name)
298+
entityKey := fmt.Sprintf(nodeInfoKey, node.Name)
299+
300+
if !alive {
301+
_, err := m.Delete(ctx, statusKey)
302+
return err
303+
}
304+
296305
if ttl <= 0 {
297306
return types.ErrNodeStatusTTL
298307
}
@@ -306,9 +315,7 @@ func (m *Mercury) SetNodeStatus(ctx context.Context, node *types.Node, ttl int64
306315
return err
307316
}
308317

309-
// nodenames are unique
310-
statusKey := filepath.Join(nodeStatusPrefix, node.Name)
311-
entityKey := fmt.Sprintf(nodeInfoKey, node.Name)
318+
312319
return m.BindStatus(ctx, entityKey, statusKey, string(data), ttl)
313320
}
314321

@@ -328,9 +335,7 @@ func (m *Mercury) GetNodeStatus(ctx context.Context, nodename string) (*types.No
328335
}
329336

330337
// NodeStatusStream returns a stream of node status
331-
// it tells you if status of a node is changed, either PUT or DELETE
332-
// PUT -> Alive: true
333-
// DELETE -> Alive: false
338+
// it tells you if status of a node is changed
334339
func (m *Mercury) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
335340
ch := make(chan *types.NodeStatus)
336341
go func() {
@@ -353,6 +358,7 @@ func (m *Mercury) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
353358
Nodename: nodename,
354359
Alive: event.Type != clientv3.EventTypeDelete,
355360
}
361+
356362
node, err := m.GetNode(ctx, nodename)
357363
if err != nil {
358364
status.Error = err

store/etcdv3/node_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,13 @@ RdCPRPt513WozkJZZAjUSP2U
141141
m.config.CertPath = "/tmp"
142142
node3, err := m.doAddNode(ctx, nodename3, endpoint3, podname, ca, cert, certkey, cpu, share, memory, storage, labels, nil, nil, nil)
143143
assert.NoError(t, err)
144-
engine3, err := m.makeClient(ctx, node3, true)
144+
engine3, err := m.makeClient(ctx, node3)
145145
assert.NoError(t, err)
146146
_, err = engine3.Info(ctx)
147147
assert.Error(t, err)
148148
// failed by get key
149149
node3.Name = "nokey"
150-
_, err = m.makeClient(ctx, node3, true)
150+
_, err = m.makeClient(ctx, node3)
151151
assert.NoError(t, err)
152152
}
153153

@@ -246,7 +246,7 @@ func TestSetNodeStatus(t *testing.T) {
246246
Podname: node.Podname,
247247
})
248248
assert.NoError(err)
249-
assert.NoError(m.SetNodeStatus(context.TODO(), node, 1))
249+
assert.NoError(m.SetNodeStatus(context.TODO(), node, true, 1))
250250
key := filepath.Join(nodeStatusPrefix, node.Name)
251251

252252
// not expired yet
@@ -277,7 +277,7 @@ func TestGetNodeStatus(t *testing.T) {
277277
Podname: node.Podname,
278278
})
279279
assert.NoError(err)
280-
assert.NoError(m.SetNodeStatus(context.TODO(), node, 1))
280+
assert.NoError(m.SetNodeStatus(context.TODO(), node, true, 1))
281281

282282
// not expired yet
283283
ns, err := m.GetNodeStatus(context.TODO(), node.Name)
@@ -322,7 +322,7 @@ func TestNodeStatusStream(t *testing.T) {
322322
default:
323323
}
324324
time.Sleep(500 * time.Millisecond)
325-
assert.NoError(m.SetNodeStatus(context.TODO(), node, 1))
325+
assert.NoError(m.SetNodeStatus(context.TODO(), node, true, 1))
326326
}
327327
}()
328328

store/mocks/Store.go

+6-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

store/redis/node.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,14 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels
290290
// SetNodeStatus sets status for a node, value will expire after ttl seconds
291291
// ttl should be larger than 0
292292
// this is heartbeat of node
293-
func (r *Rediaron) SetNodeStatus(ctx context.Context, node *types.Node, ttl int64) error {
293+
func (r *Rediaron) SetNodeStatus(ctx context.Context, node *types.Node, alive bool, ttl int64) error {
294+
key := filepath.Join(nodeStatusPrefix, node.Name)
295+
296+
if !alive {
297+
_, err := r.cli.Del(ctx, key).Result()
298+
return err
299+
}
300+
294301
if ttl <= 0 {
295302
return types.ErrNodeStatusTTL
296303
}
@@ -305,7 +312,6 @@ func (r *Rediaron) SetNodeStatus(ctx context.Context, node *types.Node, ttl int6
305312
}
306313

307314
// nodenames are unique
308-
key := filepath.Join(nodeStatusPrefix, node.Name)
309315
_, err = r.cli.Set(ctx, key, string(data), time.Duration(ttl)*time.Second).Result()
310316
return err
311317
}
@@ -326,9 +332,7 @@ func (r *Rediaron) GetNodeStatus(ctx context.Context, nodename string) (*types.N
326332
}
327333

328334
// NodeStatusStream returns a stream of node status
329-
// it tells you if status of a node is changed, either PUT or DELETE
330-
// PUT -> Alive: true
331-
// DELETE -> Alive: false
335+
// it tells you if status of a node is changed
332336
func (r *Rediaron) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
333337
ch := make(chan *types.NodeStatus)
334338
go func() {
@@ -345,6 +349,7 @@ func (r *Rediaron) NodeStatusStream(ctx context.Context) chan *types.NodeStatus
345349
Nodename: nodename,
346350
Alive: strings.ToLower(message.Action) != actionExpired,
347351
}
352+
348353
node, err := r.GetNode(ctx, nodename)
349354
if err != nil {
350355
status.Error = err

store/redis/node_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,13 @@ RdCPRPt513WozkJZZAjUSP2U
138138
s.rediaron.config.CertPath = "/tmp"
139139
node3, err := s.rediaron.doAddNode(ctx, nodename3, endpoint3, podname, ca, cert, certkey, cpu, share, memory, storage, labels, nil, nil, nil)
140140
s.NoError(err)
141-
engine3, err := s.rediaron.makeClient(ctx, node3, true)
141+
engine3, err := s.rediaron.makeClient(ctx, node3)
142142
s.NoError(err)
143143
_, err = engine3.Info(ctx)
144144
s.Error(err)
145145
// failed by get key
146146
node3.Name = "nokey"
147-
_, err = s.rediaron.makeClient(ctx, node3, true)
147+
_, err = s.rediaron.makeClient(ctx, node3)
148148
s.NoError(err)
149149
}
150150

@@ -224,7 +224,7 @@ func (s *RediaronTestSuite) TestSetNodeStatus() {
224224
Podname: "testpod",
225225
},
226226
}
227-
s.NoError(s.rediaron.SetNodeStatus(context.TODO(), node, 1))
227+
s.NoError(s.rediaron.SetNodeStatus(context.TODO(), node, true, 1))
228228
key := filepath.Join(nodeStatusPrefix, node.Name)
229229

230230
// not expired yet
@@ -246,7 +246,7 @@ func (s *RediaronTestSuite) TestGetNodeStatus() {
246246
Podname: "testpod",
247247
},
248248
}
249-
s.NoError(s.rediaron.SetNodeStatus(context.TODO(), node, 1))
249+
s.NoError(s.rediaron.SetNodeStatus(context.TODO(), node, true, 1))
250250

251251
// not expired yet
252252
ns, err := s.rediaron.GetNodeStatus(context.TODO(), node.Name)
@@ -281,7 +281,7 @@ func (s *RediaronTestSuite) TestNodeStatusStream() {
281281
default:
282282
}
283283
time.Sleep(500 * time.Millisecond)
284-
s.NoError(s.rediaron.SetNodeStatus(context.TODO(), node, 1))
284+
s.NoError(s.rediaron.SetNodeStatus(context.TODO(), node, true, 1))
285285
// manually trigger
286286
triggerMockedKeyspaceNotification(s.rediaron.cli, filepath.Join(nodeStatusPrefix, node.Name), actionSet)
287287
}

store/store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type Store interface {
3939
GetNodesByPod(ctx context.Context, podname string, labels map[string]string, all bool) ([]*types.Node, error)
4040
UpdateNodes(context.Context, ...*types.Node) error
4141
UpdateNodeResource(ctx context.Context, node *types.Node, resource *types.ResourceMeta, action string) error
42-
SetNodeStatus(ctx context.Context, node *types.Node, ttl int64) error
42+
SetNodeStatus(ctx context.Context, node *types.Node, alive bool, ttl int64) error
4343
GetNodeStatus(ctx context.Context, nodename string) (*types.NodeStatus, error)
4444
NodeStatusStream(ctx context.Context) chan *types.NodeStatus
4545

0 commit comments

Comments
 (0)