Skip to content

Commit 762804b

Browse files
anrsanrs
and
anrs
authored
fix: should waiting for the ephemeral thread done (#326)
Co-authored-by: anrs <anders.hu@shopee.com>
1 parent 06419bc commit 762804b

File tree

3 files changed

+52
-1
lines changed

3 files changed

+52
-1
lines changed

store/etcdv3/meta/ephemeral.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package meta
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"go.etcd.io/etcd/v3/clientv3"
@@ -30,7 +31,10 @@ func (e *ETCD) StartEphemeral(ctx context.Context, path string, heartbeat time.D
3031
ctx, cancel := context.WithCancel(context.Background())
3132
expiry := make(chan struct{})
3233

34+
var wg sync.WaitGroup
35+
wg.Add(1)
3336
go func() {
37+
defer wg.Done()
3438
defer close(expiry)
3539

3640
tick := time.NewTicker(heartbeat / 3)
@@ -58,5 +62,8 @@ func (e *ETCD) StartEphemeral(ctx context.Context, path string, heartbeat time.D
5862
}
5963
}()
6064

61-
return expiry, func() { cancel() }, nil
65+
return expiry, func() {
66+
cancel()
67+
wg.Wait()
68+
}, nil
6269
}

store/etcdv3/meta/ephemeral_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,28 @@ import (
88
"github.com/stretchr/testify/require"
99
)
1010

11+
func TestEphemeralDeregister(t *testing.T) {
12+
m := NewEmbeddedETCD(t)
13+
defer m.TerminateEmbededStorage()
14+
15+
ctx := context.Background()
16+
path := "/ident"
17+
heartbeat := time.Millisecond
18+
expiry, stop, err := m.StartEphemeral(ctx, path, heartbeat)
19+
require.NoError(t, err)
20+
require.NotNil(t, stop)
21+
require.NotNil(t, expiry)
22+
23+
kv, err := m.GetOne(ctx, path)
24+
require.NoError(t, err)
25+
require.Equal(t, path, string(kv.Key))
26+
27+
stop()
28+
kv, err = m.GetOne(ctx, path)
29+
require.Error(t, err)
30+
require.Nil(t, kv)
31+
}
32+
1133
func TestEphemeral(t *testing.T) {
1234
m := NewEmbeddedETCD(t)
1335
defer m.TerminateEmbededStorage()

store/etcdv3/service_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,35 @@ package etcdv3
22

33
import (
44
"context"
5+
"fmt"
56
"sort"
67
"testing"
78
"time"
89

910
"github.com/stretchr/testify/assert"
1011
)
1112

13+
func TestRegisterServiceWithDeregister(t *testing.T) {
14+
m := NewMercury(t)
15+
defer m.TerminateEmbededStorage()
16+
17+
ctx := context.Background()
18+
svc := "svc"
19+
path := fmt.Sprintf(serviceStatusKey, svc)
20+
_, deregister, err := m.RegisterService(ctx, svc, time.Minute)
21+
assert.NoError(t, err)
22+
23+
kv, err := m.GetOne(ctx, path)
24+
assert.NoError(t, err)
25+
assert.Equal(t, path, string(kv.Key))
26+
27+
deregister()
28+
//time.Sleep(time.Second)
29+
kv, err = m.GetOne(ctx, path)
30+
assert.Error(t, err)
31+
assert.Nil(t, kv)
32+
}
33+
1234
func TestServiceStatusStream(t *testing.T) {
1335
m := NewMercury(t)
1436
defer m.TerminateEmbededStorage()

0 commit comments

Comments
 (0)