Skip to content

Commit c5f33e0

Browse files
don't crash when service key exists (#325)
1 parent 762804b commit c5f33e0

File tree

3 files changed

+21
-8
lines changed

3 files changed

+21
-8
lines changed

cluster/calcium/service.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync"
66
"time"
77

8+
"github.com/pkg/errors"
89
"github.com/projecteru2/core/log"
910
"github.com/projecteru2/core/types"
1011
"github.com/projecteru2/core/utils"
@@ -30,10 +31,21 @@ func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err e
3031
return
3132
}
3233

33-
expiry, unregisterService, err := c.registerService(ctx, serviceAddress)
34-
if err != nil {
35-
log.Errorf("[RegisterService] failed to first register service: %v", err)
36-
return
34+
var (
35+
expiry <-chan struct{}
36+
unregisterService func()
37+
)
38+
for {
39+
if expiry, unregisterService, err = c.registerService(ctx, serviceAddress); err == nil {
40+
break
41+
}
42+
if errors.Is(err, types.ErrKeyExists) {
43+
log.Debugf("[RegisterService] service key exists: %v", err)
44+
time.Sleep(time.Second)
45+
continue
46+
}
47+
log.Errorf("[RegisterService] failed to first register service: %+v", err)
48+
return nil, errors.WithStack(err)
3749
}
3850

3951
wg := &sync.WaitGroup{}

cluster/calcium/service_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func TestRegisterServiceFailed(t *testing.T) {
7676
defer cancel()
7777

7878
_, err := c.RegisterService(ctx)
79-
assert.Equal(t, experr, err)
79+
assert.EqualError(t, err, "error")
8080
}
8181

8282
func TestWatchServiceStatus(t *testing.T) {

store/etcdv3/meta/ephemeral.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"go.etcd.io/etcd/v3/clientv3"
99

10+
"github.com/pkg/errors"
1011
"github.com/projecteru2/core/log"
1112
"github.com/projecteru2/core/types"
1213
)
@@ -15,17 +16,17 @@ import (
1516
func (e *ETCD) StartEphemeral(ctx context.Context, path string, heartbeat time.Duration) (<-chan struct{}, func(), error) {
1617
lease, err := e.cliv3.Grant(ctx, int64(heartbeat/time.Second))
1718
if err != nil {
18-
return nil, nil, err
19+
return nil, nil, errors.WithStack(err)
1920
}
2021

2122
switch tx, err := e.cliv3.Txn(ctx).
2223
If(clientv3.Compare(clientv3.Version(path), "=", 0)).
2324
Then(clientv3.OpPut(path, "", clientv3.WithLease(lease.ID))).
2425
Commit(); {
2526
case err != nil:
26-
return nil, nil, err
27+
return nil, nil, errors.WithStack(err)
2728
case !tx.Succeeded:
28-
return nil, nil, types.NewDetailedErr(types.ErrKeyExists, path)
29+
return nil, nil, errors.Wrap(types.ErrKeyExists, path)
2930
}
3031

3132
ctx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)