Skip to content

Commit b1d9043

Browse files
committed
fix the issue that BindStatus cannot handle ttl changes
1 parent 7adbaf2 commit b1d9043

File tree

2 files changed

+101
-19
lines changed

2 files changed

+101
-19
lines changed

cluster/calcium/node.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,10 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
7171

7272
n.Available = (opts.StatusOpt == types.TriTrue) || (opts.StatusOpt == types.TriKeep && n.Available)
7373
n.Bypass = (opts.BypassOpt == types.TriTrue) || (opts.BypassOpt == types.TriKeep && n.Bypass)
74-
if !n.Bypass && !n.Available {
74+
if n.IsDown() {
7575
log.Errorf(ctx, "[SetNodeAvailable] node marked down: %s", opts.Nodename)
76+
}
77+
if !n.Available {
7678
// remove node status
7779
if err := c.store.SetNodeStatus(ctx, node, -1); err != nil {
7880
// don't return here
@@ -104,7 +106,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
104106

105107
// mark workload which belongs to this node as unhealthy
106108
if err = c.store.SetWorkloadStatus(ctx, workload.StatusMeta, 0); err != nil {
107-
log.Errorf(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive failed %v", workload.ID, opts.Nodename, err)
109+
log.Errorf(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive failed %v", workload.ID, opts.Nodename, errors.WithStack(err))
108110
} else {
109111
log.Infof(ctx, "[SetNodeAvailable] Set workload %s on node %s as inactive", workload.ID, opts.Nodename)
110112
}

store/etcdv3/meta/etcd.go

+97-17
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package meta
33
import (
44
"context"
55
"crypto/tls"
6+
"errors"
67
"fmt"
78
"strconv"
89
"sync"
@@ -239,6 +240,34 @@ func (e *ETCD) BatchUpdate(ctx context.Context, data map[string]string, opts ...
239240
return e.batchUpdate(ctx, data, opts...)
240241
}
241242

243+
// isTTLChanged returns true iff there is a lease with the same ttl bound to the key
244+
func (e *ETCD) isTTLChanged(ctx context.Context, key string, ttl int64) (bool, error) {
245+
resp, err := e.GetOne(ctx, key)
246+
if err != nil {
247+
if errors.Is(err, types.ErrBadCount) {
248+
return false, nil
249+
}
250+
return false, err
251+
}
252+
253+
leaseID := clientv3.LeaseID(resp.Lease)
254+
if leaseID == 0 {
255+
return true, nil
256+
}
257+
258+
getTTLResp, err := e.cliv3.TimeToLive(ctx, leaseID)
259+
if err != nil {
260+
return false, err
261+
}
262+
263+
changed := getTTLResp.GrantedTTL != ttl
264+
if changed {
265+
log.Infof(ctx, "[isTTLChanged] key %v ttl changed from %v to %v", key, getTTLResp.GrantedTTL, ttl)
266+
}
267+
268+
return changed, nil
269+
}
270+
242271
// BindStatus keeps on a lease alive.
243272
func (e *ETCD) BindStatus(ctx context.Context, entityKey, statusKey, statusValue string, ttl int64) error {
244273
if ttl == 0 {
@@ -256,19 +285,38 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
256285
leaseID := lease.ID
257286
updateStatus := []clientv3.Op{clientv3.OpPut(statusKey, statusValue, clientv3.WithLease(lease.ID))}
258287

259-
entityTxn, err := e.cliv3.Txn(ctx).
260-
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
261-
Then( // making sure there's an exists entity kv-pair.
262-
clientv3.OpTxn(
263-
[]clientv3.Cmp{clientv3.Compare(clientv3.Version(statusKey), "!=", 0)}, // Is the status exists?
264-
[]clientv3.Op{clientv3.OpTxn( // there's an exists status
265-
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "=", statusValue)},
266-
[]clientv3.Op{clientv3.OpGet(statusKey)}, // The status hasn't been changed.
267-
updateStatus, // The status had been changed.
268-
)},
269-
updateStatus, // there isn't a status
270-
),
271-
).Commit()
288+
ttlChanged, err := e.isTTLChanged(ctx, statusKey, ttl)
289+
if err != nil {
290+
return err
291+
}
292+
293+
var entityTxn *clientv3.TxnResponse
294+
295+
if ttlChanged {
296+
entityTxn, err = e.cliv3.Txn(ctx).
297+
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
298+
Then(updateStatus...). // making sure there's an exists entity kv-pair.
299+
Commit()
300+
} else {
301+
entityTxn, err = e.cliv3.Txn(ctx).
302+
If(clientv3.Compare(clientv3.Version(entityKey), "!=", 0)).
303+
Then( // making sure there's an exists entity kv-pair.
304+
clientv3.OpTxn(
305+
[]clientv3.Cmp{clientv3.Compare(clientv3.Version(statusKey), "!=", 0)}, // Is the status exists?
306+
[]clientv3.Op{clientv3.OpTxn( // there's an exists status
307+
[]clientv3.Cmp{clientv3.Compare(clientv3.LeaseValue(statusKey), "!=", 0)}, //
308+
[]clientv3.Op{clientv3.OpTxn( // there has been a lease bound to the status
309+
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "=", statusValue)}, // Is the status changed?
310+
[]clientv3.Op{clientv3.OpGet(statusKey)}, // The status hasn't been changed.
311+
updateStatus, // The status had been changed.
312+
)},
313+
updateStatus, // there is no lease bound to the status
314+
)},
315+
updateStatus, // there isn't a status
316+
),
317+
).Commit()
318+
}
319+
272320
if err != nil {
273321
e.revokeLease(ctx, leaseID)
274322
return err
@@ -280,14 +328,28 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
280328
return types.ErrEntityNotExists
281329
}
282330

331+
// if ttl is changed, replace with the new lease
332+
if ttlChanged {
333+
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
334+
return nil
335+
}
336+
283337
// There isn't a status bound to the entity.
284338
statusTxn := entityTxn.Responses[0].GetResponseTxn()
285339
if !statusTxn.Succeeded {
340+
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
341+
return nil
342+
}
343+
344+
// There is no lease bound to the status yet
345+
leaseTxn := statusTxn.Responses[0].GetResponseTxn()
346+
if !leaseTxn.Succeeded {
347+
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
286348
return nil
287349
}
288350

289351
// There is a status bound to the entity yet but its value isn't same as the expected one.
290-
valueTxn := statusTxn.Responses[0].GetResponseTxn()
352+
valueTxn := leaseTxn.Responses[0].GetResponseTxn()
291353
if !valueTxn.Succeeded {
292354
log.Infof(ctx, "[bindStatusWithTTL] put: key %s value %s", statusKey, statusValue)
293355
return nil
@@ -310,7 +372,22 @@ func (e *ETCD) bindStatusWithTTL(ctx context.Context, entityKey, statusKey, stat
310372
// agent may report status earlier when core has not recorded the entity.
311373
func (e *ETCD) bindStatusWithoutTTL(ctx context.Context, statusKey, statusValue string) error {
312374
updateStatus := []clientv3.Op{clientv3.OpPut(statusKey, statusValue)}
313-
_, err := e.cliv3.Txn(ctx).
375+
376+
ttlChanged, err := e.isTTLChanged(ctx, statusKey, 0)
377+
if err != nil {
378+
return err
379+
}
380+
if ttlChanged {
381+
_, err := e.Put(ctx, statusKey, statusValue)
382+
if err != nil {
383+
return err
384+
}
385+
386+
log.Infof(ctx, "[bindStatusWithoutTTL] put: key %s value %s", statusKey, statusValue)
387+
return nil
388+
}
389+
390+
resp, err := e.cliv3.Txn(ctx).
314391
If(clientv3.Compare(clientv3.Version(statusKey), "!=", 0)). // if there's an existing status key
315392
Then(clientv3.OpTxn( // deal with existing status key
316393
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(statusKey), "!=", statusValue)}, // if the new value != the old value
@@ -319,10 +396,13 @@ func (e *ETCD) bindStatusWithoutTTL(ctx context.Context, statusKey, statusValue
319396
)).
320397
Else(updateStatus...). // otherwise deal with non-existing status key
321398
Commit()
322-
if err == nil {
399+
if err != nil {
400+
return err
401+
}
402+
if !resp.Succeeded || resp.Responses[0].GetResponseTxn().Succeeded {
323403
log.Infof(ctx, "[bindStatusWithoutTTL] put: key %s value %s", statusKey, statusValue)
324404
}
325-
return err
405+
return nil
326406
}
327407

328408
func (e *ETCD) revokeLease(ctx context.Context, leaseID clientv3.LeaseID) {

0 commit comments

Comments
 (0)