Skip to content

Commit 6eb0177

Browse files
jschwinger233CMGS
authored andcommitted
store, cluster: add unittests for service
1 parent 6d7e5a3 commit 6eb0177

File tree

6 files changed

+162
-10
lines changed

6 files changed

+162
-10
lines changed

cluster/calcium/service.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,11 @@ func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err e
109109
done := make(chan struct{})
110110
ctx, cancel := context.WithCancel(ctx)
111111
go func() {
112+
defer close(done)
112113
defer func() {
113114
if err := c.store.UnregisterService(context.Background(), serviceAddress); err != nil {
114115
log.Errorf("[RegisterService] failed to unregister service: %v", err)
115116
}
116-
close(done)
117117
}()
118118

119119
timer := time.NewTicker(c.config.GRPCConfig.ServiceHeartbeatInterval / 2)

cluster/calcium/service_test.go

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package calcium
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
storemocks "github.com/projecteru2/core/store/mocks"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
13+
)
14+
15+
func TestServiceStatusStream(t *testing.T) {
16+
c := NewTestCluster()
17+
c.config.Bind = ":5001"
18+
c.config.GRPCConfig.ServiceHeartbeatInterval = 100 * time.Millisecond
19+
c.config.GRPCConfig.ServiceDiscoveryPushInterval = 10 * time.Second
20+
store := &storemocks.Store{}
21+
c.store = store
22+
23+
registered := map[string]int{}
24+
store.On("RegisterService", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("string"), mock.AnythingOfType("time.Duration")).Return(
25+
func(_ context.Context, addr string, _ time.Duration) error {
26+
if v, ok := registered[addr]; ok {
27+
registered[addr] = v + 1
28+
} else {
29+
registered[addr] = 1
30+
}
31+
return nil
32+
},
33+
)
34+
store.On("UnregisterService", mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("string")).Return(
35+
func(_ context.Context, addr string) error {
36+
delete(registered, addr)
37+
return nil
38+
},
39+
)
40+
41+
ctx, cancel := context.WithCancel(context.Background())
42+
defer cancel()
43+
unregister, err := c.RegisterService(ctx)
44+
assert.NoError(t, err)
45+
assert.Equal(t, len(registered), 1)
46+
for _, v := range registered {
47+
assert.Equal(t, v, 1)
48+
}
49+
time.Sleep(100 * time.Millisecond)
50+
for _, v := range registered {
51+
assert.Equal(t, v, 2)
52+
}
53+
unregister()
54+
assert.Equal(t, len(registered), 0)
55+
}
56+
57+
func TestWatchServiceStatus(t *testing.T) {
58+
c := NewTestCluster()
59+
c.config.GRPCConfig.ServiceDiscoveryPushInterval = 500 * time.Millisecond
60+
store := &storemocks.Store{}
61+
c.store = store
62+
c.watcher = &serviceWatcher{}
63+
64+
store.On("ServiceStatusStream", mock.AnythingOfType("*context.emptyCtx")).Return(
65+
func(_ context.Context) chan []string {
66+
ch := make(chan []string)
67+
go func() {
68+
ticker := time.NewTicker(50 * time.Millisecond)
69+
cnt := 0
70+
for range ticker.C {
71+
if cnt == 2 {
72+
break
73+
}
74+
ch <- []string{fmt.Sprintf("127.0.0.1:500%d", cnt)}
75+
cnt++
76+
}
77+
}()
78+
return ch
79+
}, nil,
80+
)
81+
82+
ch, err := c.WatchServiceStatus(context.Background())
83+
assert.NoError(t, err)
84+
ch2, err := c.WatchServiceStatus(context.Background())
85+
assert.NoError(t, err)
86+
wg := sync.WaitGroup{}
87+
wg.Add(2)
88+
go func() {
89+
defer wg.Done()
90+
assert.Equal(t, (<-ch).Addresses, []string{"127.0.0.1:5000"})
91+
assert.Equal(t, (<-ch).Addresses, []string{"127.0.0.1:5001"})
92+
assert.Equal(t, (<-ch).Addresses, []string{"127.0.0.1:5001"})
93+
}()
94+
go func() {
95+
defer wg.Done()
96+
assert.Equal(t, (<-ch2).Addresses, []string{"127.0.0.1:5000"})
97+
assert.Equal(t, (<-ch2).Addresses, []string{"127.0.0.1:5001"})
98+
assert.Equal(t, (<-ch2).Addresses, []string{"127.0.0.1:5001"})
99+
}()
100+
wg.Wait()
101+
}

core.yaml.sample

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
log_level: "DEBUG"
22
bind: ":5001"
3-
service_address: 10.22.12.87:5001
43
statsd: "127.0.0.1:8125"
54
profile: ":12346"
65
global_timeout: 300s
@@ -14,8 +13,8 @@ auth:
1413
grpc:
1514
max_concurrent_streams: 100
1615
max_recv_msg_size: 30 # will covert to MBytes
17-
service_discovery_interval: 10s # WatchServiceStatus push interval
18-
service_heartbeat_interval: 10s # RegisterService heartbeat
16+
service_discovery_interval: 5s # WatchServiceStatus push interval
17+
service_heartbeat_interval: 5s # RegisterService heartbeat
1918

2019
etcd:
2120
machines:

store/etcdv3/mercury.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
)
2323

2424
const (
25-
podInfoKey = "/pod/info/%s" // /pod/info/{podname}
26-
serviceStatusPrefix = "/services/"
25+
podInfoKey = "/pod/info/%s" // /pod/info/{podname}
26+
serviceStatusKey = "/services/%s" // /service/{ipv4:port}
2727

2828
nodeInfoKey = "/node/%s" // /node/{nodename}
2929
nodePodKey = "/node/%s:pod/%s" // /node/{podname}:pod/{nodename}

store/etcdv3/service.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package etcdv3
22

33
import (
44
"context"
5+
"fmt"
56
"strings"
67
"time"
78

@@ -41,7 +42,7 @@ func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error
4142
go func() {
4243
defer close(ch)
4344
log.Info("[ServiceStatusStream] start watching service status")
44-
resp, err := m.Get(ctx, serviceStatusPrefix, clientv3.WithPrefix())
45+
resp, err := m.Get(ctx, fmt.Sprintf(serviceStatusKey, ""), clientv3.WithPrefix())
4546
if err != nil {
4647
log.Errorf("[ServiceStatusStream] failed to get current services: %v", err)
4748
return
@@ -52,7 +53,7 @@ func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error
5253
}
5354
ch <- eps.ToSlice()
5455

55-
for resp := range m.watch(ctx, serviceStatusPrefix, clientv3.WithPrefix()) {
56+
for resp := range m.watch(ctx, fmt.Sprintf(serviceStatusKey, ""), clientv3.WithPrefix()) {
5657
if resp.Err() != nil {
5758
if !resp.Canceled {
5859
log.Errorf("[ServiceStatusStream] watch failed %v", resp.Err())
@@ -84,7 +85,7 @@ func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error
8485

8586
// RegisterService put /services/{address}
8687
func (m *Mercury) RegisterService(ctx context.Context, serviceAddress string, expire time.Duration) error {
87-
key := serviceStatusPrefix + serviceAddress
88+
key := fmt.Sprintf(serviceStatusKey, serviceAddress)
8889
lease, err := m.cliv3.Grant(ctx, int64(expire/time.Second))
8990
if err != nil {
9091
return err
@@ -96,7 +97,7 @@ func (m *Mercury) RegisterService(ctx context.Context, serviceAddress string, ex
9697

9798
// UnregisterService del /services/{address}
9899
func (m *Mercury) UnregisterService(ctx context.Context, serviceAddress string) error {
99-
key := serviceStatusPrefix + serviceAddress
100+
key := fmt.Sprintf(serviceStatusKey, serviceAddress)
100101
_, err := m.Delete(ctx, key)
101102
return err
102103
}

store/etcdv3/service_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package etcdv3
2+
3+
import (
4+
"context"
5+
"sort"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestRegisterService(t *testing.T) {
14+
m := NewMercury(t)
15+
defer m.TerminateEmbededStorage()
16+
ctx := context.Background()
17+
expire := 100 * time.Millisecond
18+
err := m.RegisterService(ctx, "127.0.0.1:5002", expire)
19+
assert.NoError(t, err)
20+
kv, err := m.GetOne(ctx, "/services/127.0.0.1:5002")
21+
assert.NoError(t, err)
22+
assert.True(t, strings.HasSuffix(string(kv.Key), "127.0.0.1:5002"))
23+
}
24+
25+
func TestUnregisterService(t *testing.T) {
26+
m := NewMercury(t)
27+
defer m.TerminateEmbededStorage()
28+
ctx := context.Background()
29+
addr := "127.0.0.1:5002"
30+
assert.NoError(t, m.RegisterService(ctx, addr, time.Second))
31+
assert.NoError(t, m.UnregisterService(ctx, addr))
32+
_, err := m.GetOne(ctx, "/services/127.0.0.1:5002")
33+
assert.Error(t, err, "bad `Count` value")
34+
}
35+
36+
func TestServiceStatusStream(t *testing.T) {
37+
m := NewMercury(t)
38+
defer m.TerminateEmbededStorage()
39+
ctx, cancel := context.WithCancel(context.Background())
40+
defer cancel()
41+
assert.NoError(t, m.RegisterService(ctx, "127.0.0.1:5001", time.Second))
42+
ch, err := m.ServiceStatusStream(ctx)
43+
assert.NoError(t, err)
44+
assert.Equal(t, <-ch, []string{"127.0.0.1:5001"})
45+
assert.NoError(t, m.RegisterService(ctx, "127.0.0.1:5002", time.Second))
46+
endpoints := <-ch
47+
sort.Strings(endpoints)
48+
assert.Equal(t, endpoints, []string{"127.0.0.1:5001", "127.0.0.1:5002"})
49+
assert.NoError(t, m.UnregisterService(ctx, "127.0.0.1:5001"))
50+
assert.Equal(t, <-ch, []string{"127.0.0.1:5002"})
51+
}

0 commit comments

Comments
 (0)