Skip to content

Commit 2263dcf

Browse files
authored
fix a bug that could cause "Helium.dispatch" to panic (#552)
1 parent 4aa9a48 commit 2263dcf

File tree

2 files changed

+43
-0
lines changed

2 files changed

+43
-0
lines changed

discovery/helium/helium.go

+13
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
// Helium .
1616
type Helium struct {
1717
sync.Once
18+
lock *sync.RWMutex
1819
config types.GRPCConfig
1920
stor store.Store
2021
subs sync.Map
@@ -25,6 +26,7 @@ func New(config types.GRPCConfig, stor store.Store) *Helium {
2526
h := &Helium{}
2627
h.config = config
2728
h.stor = stor
29+
h.lock = &sync.RWMutex{}
2830
h.Do(func() {
2931
h.start(context.TODO()) // TODO rewrite ctx here, because this will run only once!
3032
})
@@ -33,13 +35,17 @@ func New(config types.GRPCConfig, stor store.Store) *Helium {
3335

3436
// Subscribe .
3537
func (h *Helium) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
38+
h.lock.Lock()
39+
defer h.lock.Unlock()
3640
id := uuid.New()
3741
_, _ = h.subs.LoadOrStore(id, ch)
3842
return id
3943
}
4044

4145
// Unsubscribe .
4246
func (h *Helium) Unsubscribe(id uuid.UUID) {
47+
h.lock.Lock()
48+
defer h.lock.Unlock()
4349
h.subs.Delete(id)
4450
}
4551

@@ -77,7 +83,14 @@ func (h *Helium) start(ctx context.Context) {
7783
}
7884

7985
func (h *Helium) dispatch(status types.ServiceStatus) {
86+
h.lock.RLock()
87+
defer h.lock.RUnlock()
8088
h.subs.Range(func(k, v interface{}) bool {
89+
defer func() {
90+
if err := recover(); err != nil {
91+
log.Errorf(context.TODO(), "[dispatch] dispatch %s failed, err: %v", k, err)
92+
}
93+
}()
8194
c, ok := v.(chan<- types.ServiceStatus)
8295
if !ok {
8396
log.Error("[WatchServiceStatus] failed to cast channel from map")

discovery/helium/helium_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,33 @@ func TestHelium(t *testing.T) {
4747
close(chAddr)
4848
close(chStatus)
4949
}
50+
51+
func TestPanic(t *testing.T) {
52+
chAddr := make(chan []string)
53+
54+
store := &storemocks.Store{}
55+
store.On("ServiceStatusStream", mock.Anything).Return(chAddr, nil)
56+
57+
grpcConfig := types.GRPCConfig{
58+
ServiceDiscoveryPushInterval: time.Duration(1) * time.Second,
59+
}
60+
service := New(grpcConfig, store)
61+
62+
for i := 0; i < 1000; i++ {
63+
go func() {
64+
chStatus := make(chan types.ServiceStatus)
65+
uuid := service.Subscribe(chStatus)
66+
time.Sleep(time.Second)
67+
service.Unsubscribe(uuid)
68+
close(chStatus)
69+
}()
70+
}
71+
72+
go func() {
73+
for i := 0; i < 1000; i++ {
74+
chAddr <- []string{"hhh", "hhh2"}
75+
}
76+
}()
77+
78+
time.Sleep(5 * time.Second)
79+
}

0 commit comments

Comments
 (0)