Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: only remove node status when the node is unavailable #516

Merged
merged 2 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
if n.IsDown() {
logger.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
}
if !n.Available {
// remove node status
err := c.store.SetNodeStatus(ctx, node, -1)
if err != nil {
if err := c.store.SetNodeStatus(ctx, node, -1); err != nil {
// don't return here
log.Errorf(ctx, "[SetNode] failed to set node status, err: %+v", errors.WithStack(err))
logger.Errorf(ctx, "[SetNode] failed to remove node status, err: %+v", errors.WithStack(err))
}
}
if opts.WorkloadsDown {
Expand Down Expand Up @@ -105,7 +106,9 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ

// mark workload which belongs to this node as unhealthy
if err = c.store.SetWorkloadStatus(ctx, workload.StatusMeta, 0); err != nil {
log.Errorf(ctx, "[SetNodeAvailable] Set workload %s on node %s inactive failed %v", workload.ID, opts.Nodename, err)
log.Errorf(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive failed %v", workload.ID, opts.Nodename, errors.WithStack(err))
} else {
log.Infof(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive", workload.ID, opts.Nodename)
}
}
}
Expand Down
116 changes: 100 additions & 16 deletions store/etcdv3/meta/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package meta
import (
"context"
"crypto/tls"
"errors"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -239,6 +240,34 @@ func (e *ETCD) BatchUpdate(ctx context.Context, data map[string]string, opts ...
return e.batchUpdate(ctx, data, opts...)
}

// isTTLChanged returns true if there is a lease with a different ttl bound to the key
func (e *ETCD) isTTLChanged(ctx context.Context, key string, ttl int64) (bool, error) {
resp, err := e.GetOne(ctx, key)
if err != nil {
if errors.Is(err, types.ErrBadCount) {
return ttl != 0, nil
}
return false, err
}

leaseID := clientv3.LeaseID(resp.Lease)
if leaseID == 0 {
return ttl != 0, nil
}

getTTLResp, err := e.cliv3.TimeToLive(ctx, leaseID)
if err != nil {
return false, err
}

changed := getTTLResp.GrantedTTL != ttl
if changed {
log.Infof(ctx, "[isTTLChanged] key %v ttl changed from %v to %v", key, getTTLResp.GrantedTTL, ttl)
}

return changed, nil
}

// BindStatus keeps on a lease alive.
func (e *ETCD) BindStatus(ctx context.Context, entityKey, statusKey, statusValue string, ttl int64) error {
if ttl == 0 {
Expand All @@ -256,19 +285,38 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
leaseID := lease.ID
updateStatus := []clientv3.Op{clientv3.OpPut(statusKey, statusValue, clientv3.WithLease(lease.ID))}

entityTxn, err := e.cliv3.Txn(ctx).
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
Then( // making sure there's an exists entity kv-pair.
clientv3.OpTxn(
[]clientv3.Cmp{clientv3.Compare(clientv3.Version(statusKey), "!=", 0)}, // Is the status exists?
[]clientv3.Op{clientv3.OpTxn( // there's an exists status
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "=", statusValue)},
[]clientv3.Op{clientv3.OpGet(statusKey)}, // The status hasn't been changed.
updateStatus, // The status had been changed.
)},
updateStatus, // there isn't a status
),
).Commit()
ttlChanged, err := e.isTTLChanged(ctx, statusKey, ttl)
if err != nil {
return err
}

var entityTxn *clientv3.TxnResponse

if ttlChanged {
entityTxn, err = e.cliv3.Txn(ctx).
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
Then(updateStatus...). // making sure there's an exists entity kv-pair.
Commit()
} else {
entityTxn, err = e.cliv3.Txn(ctx).
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
Then( // making sure there's an exists entity kv-pair.
clientv3.OpTxn(
[]clientv3.Cmp{clientv3.Compare(clientv3.Version(statusKey), "!=", 0)}, // Is the status exists?
[]clientv3.Op{clientv3.OpTxn( // there's an exists status
[]clientv3.Cmp{clientv3.Compare(clientv3.LeaseValue(statusKey), "!=", 0)}, //
[]clientv3.Op{clientv3.OpTxn( // there has been a lease bound to the status
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "=", statusValue)}, // Is the status changed?
[]clientv3.Op{clientv3.OpGet(statusKey)}, // The status hasn't been changed.
updateStatus, // The status had been changed.
)},
updateStatus, // there is no lease bound to the status
)},
updateStatus, // there isn't a status
),
).Commit()
}

if err != nil {
e.revokeLease(ctx, leaseID)
return err
Expand All @@ -280,15 +328,30 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
return types.ErrEntityNotExists
}

// if ttl is changed, replace with the new lease
if ttlChanged {
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

// There isn't a status bound to the entity.
statusTxn := entityTxn.Responses[0].GetResponseTxn()
if !statusTxn.Succeeded {
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

// There is no lease bound to the status yet
leaseTxn := statusTxn.Responses[0].GetResponseTxn()
if !leaseTxn.Succeeded {
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

// There is a status bound to the entity yet but its value isn't same as the expected one.
valueTxn := statusTxn.Responses[0].GetResponseTxn()
valueTxn := leaseTxn.Responses[0].GetResponseTxn()
if !valueTxn.Succeeded {
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

Expand All @@ -309,7 +372,22 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
// agent may report status earlier when core has not recorded the entity.
func (e *ETCD) bindStatusWithoutTTL(ctx context.Context, statusKey, statusValue string) error {
updateStatus := []clientv3.Op{clientv3.OpPut(statusKey, statusValue)}
_, err := e.cliv3.Txn(ctx).

ttlChanged, err := e.isTTLChanged(ctx, statusKey, 0)
if err != nil {
return err
}
if ttlChanged {
_, err := e.Put(ctx, statusKey, statusValue)
if err != nil {
return err
}

log.Infof(ctx, "[bindStatusWithoutTTL] put: key %s value %s", statusKey, statusValue)
return nil
}

resp, err := e.cliv3.Txn(ctx).
If(clientv3.Compare(clientv3.Version(statusKey), "!=", 0)). // if there's an existing status key
Then(clientv3.OpTxn( // deal with existing status key
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "!=", statusValue)}, // if the new value != the old value
Expand All @@ -318,7 +396,13 @@ func (e *ETCD) bindStatusWithoutTTL(ctx context.Context, statusKey, statusValue
)).
Else(updateStatus...). // otherwise deal with non-existing status key
Commit()
return err
if err != nil {
return err
}
if !resp.Succeeded || resp.Responses[0].GetResponseTxn().Succeeded {
log.Infof(ctx, "[bindStatusWithoutTTL] put: key %s value %s", statusKey, statusValue)
}
return nil
}

func (e *ETCD) revokeLease(ctx context.Context, leaseID clientv3.LeaseID) {
Expand Down
72 changes: 39 additions & 33 deletions store/etcdv3/meta/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func TestGetOneError(t *testing.T) {
e := NewMockedETCD(t)
expErr := fmt.Errorf("exp")
e.cliv3.(*mocks.ETCDClientV3).On("Get", mock.Anything, mock.Anything).Return(nil, expErr).Once()
e.cliv3.(*mocks.ETCDClientV3).On("Get", mock.Anything, mock.Anything).Return(nil, expErr)
kv, err := e.GetOne(context.Background(), "foo")
require.Equal(t, expErr, err)
require.Nil(t, kv)
Expand All @@ -29,7 +29,7 @@ func TestGetOneError(t *testing.T) {
func TestGetOneFailedAsRespondMore(t *testing.T) {
e := NewMockedETCD(t)
expResp := &clientv3.GetResponse{Count: 2}
e.cliv3.(*mocks.ETCDClientV3).On("Get", mock.Anything, mock.Anything).Return(expResp, nil).Once()
e.cliv3.(*mocks.ETCDClientV3).On("Get", mock.Anything, mock.Anything).Return(expResp, nil)
kv, err := e.GetOne(context.Background(), "foo")
require.Error(t, err)
require.Nil(t, kv)
Expand All @@ -46,10 +46,10 @@ func TestGetMultiFailedAsBatchGetError(t *testing.T) {
e := NewMockedETCD(t)
expErr := fmt.Errorf("exp")
expTxn := &mocks.Txn{}
expTxn.On("If", mock.Anything).Return(expTxn).Once()
expTxn.On("Then", mock.Anything).Return(expTxn).Once()
expTxn.On("Else", mock.Anything).Return(expTxn).Once()
expTxn.On("Commit").Return(nil, expErr).Once()
expTxn.On("If", mock.Anything).Return(expTxn)
expTxn.On("Then", mock.Anything).Return(expTxn)
expTxn.On("Else", mock.Anything).Return(expTxn)
expTxn.On("Commit").Return(nil, expErr)
e.cliv3.(*mocks.ETCDClientV3).On("Txn", mock.Anything).Return(expTxn)
kvs, err := e.GetMulti(context.Background(), []string{"foo"})
require.Equal(t, expErr, err)
Expand All @@ -69,7 +69,7 @@ func TestBindStatusFailedAsGrantError(t *testing.T) {
e, etcd, assert := testKeepAliveETCD(t)
defer assert()
expErr := fmt.Errorf("exp")
etcd.On("Grant", mock.Anything, mock.Anything).Return(nil, expErr).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(nil, expErr)
require.Equal(t, expErr, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand All @@ -80,12 +80,13 @@ func TestBindStatusFailedAsCommitError(t *testing.T) {
expErr := fmt.Errorf("exp")
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("Commit").Return(nil, expErr).Once()
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(nil, expErr)

etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Txn", mock.Anything).Return(txn)
require.Equal(t, expErr, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand All @@ -96,12 +97,13 @@ func TestBindStatusButEntityTxnUnsuccessful(t *testing.T) {
entityTxn := &clientv3.TxnResponse{Succeeded: false}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Txn", mock.Anything).Return(txn)
require.Equal(t, types.ErrEntityNotExists, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand All @@ -122,12 +124,13 @@ func TestBindStatusButStatusTxnUnsuccessful(t *testing.T) {
}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Txn", mock.Anything).Return(txn)
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand All @@ -148,12 +151,14 @@ func TestBindStatusWithZeroTTL(t *testing.T) {
}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("Else", mock.Anything).Return(txn).Once()
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Else", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Txn", mock.Anything).Return(txn)

etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 0))
}

Expand Down Expand Up @@ -185,12 +190,13 @@ func TestBindStatusButValueTxnUnsuccessful(t *testing.T) {
}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn)
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand Down Expand Up @@ -236,13 +242,13 @@ func TestBindStatus(t *testing.T) {
}
txn := &mocks.Txn{}
defer txn.AssertExpectations(t)
txn.On("If", mock.Anything).Return(txn).Once()
txn.On("Then", mock.Anything).Return(txn).Once()
txn.On("If", mock.Anything).Return(txn)
txn.On("Then", mock.Anything).Return(txn)
txn.On("Commit").Return(entityTxn, nil)

etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil).Once()
etcd.On("Txn", mock.Anything).Return(txn).Once()
etcd.On("KeepAliveOnce", mock.Anything, clientv3.LeaseID(leaseID)).Return(nil, nil).Once()
etcd.On("Grant", mock.Anything, mock.Anything).Return(&clientv3.LeaseGrantResponse{}, nil)
etcd.On("Txn", mock.Anything).Return(txn)
etcd.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{}, nil)
require.Equal(t, nil, e.BindStatus(context.Background(), "/entity", "/status", "status", 1))
}

Expand Down