Skip to content

Commit 2a8152d

Browse files
check L3 reachability before update grpc service addr (#395)
1 parent cfb89c7 commit 2a8152d

File tree

6 files changed

+161
-6
lines changed

6 files changed

+161
-6
lines changed

client/resolver/eru/resolver.go

-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ func (r *Resolver) sync() {
6565
}
6666

6767
var addresses []resolver.Address
68-
log.Debugf("[EruResolver] update state: %v", endpoints)
6968
for _, ep := range endpoints {
7069
addresses = append(addresses, resolver.Address{Addr: ep})
7170
}

client/servicediscovery/eru_service_discovery.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/projecteru2/core/auth"
1010
"github.com/projecteru2/core/client/interceptor"
11+
"github.com/projecteru2/core/client/utils"
1112
"github.com/projecteru2/core/log"
1213
pb "github.com/projecteru2/core/rpc/gen"
1314
"github.com/projecteru2/core/types"
@@ -37,6 +38,9 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
3738
}
3839
client := pb.NewCoreRPCClient(cc)
3940
ch := make(chan []string)
41+
epPusher := utils.NewEndpointPusher()
42+
epPusher.Register(ch)
43+
epPusher.Register(lbResolverBuilder.updateCh)
4044
go func() {
4145
defer close(ch)
4246
for {
@@ -68,8 +72,8 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
6872
break
6973
}
7074
expectedInterval = time.Duration(status.GetIntervalInSecond())
71-
lbResolverBuilder.updateCh <- status.GetAddresses()
72-
ch <- status.GetAddresses()
75+
76+
epPusher.Push(status.GetAddresses())
7377
}
7478
}
7579
}()

client/servicediscovery/resolver.go

-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package servicediscovery
22

33
import (
4-
"github.com/projecteru2/core/log"
5-
64
"google.golang.org/grpc/resolver"
75
)
86

@@ -22,7 +20,6 @@ func newLBResolver(cc resolver.ClientConn, endpoint string, updateCh <-chan []st
2220
}
2321

2422
func (r *lbResolver) updateAddresses(endpoints ...string) {
25-
log.Debugf("[lbResolver] update state: %v", endpoints)
2623
addresses := []resolver.Address{}
2724
for _, ep := range endpoints {
2825
addresses = append(addresses, resolver.Address{Addr: ep})

client/utils/servicepusher.go

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
"github.com/go-ping/ping"
11+
"github.com/projecteru2/core/log"
12+
)
13+
14+
// EndpointPusher pushes endpoints to registered channels if the ep is L3 reachable
15+
type EndpointPusher struct {
16+
chans []chan []string
17+
pendingEndpoints sync.Map
18+
availableEndpoints sync.Map
19+
}
20+
21+
// NewEndpointPusher .
22+
func NewEndpointPusher() *EndpointPusher {
23+
return &EndpointPusher{}
24+
}
25+
26+
// Register registers a channel that will receive the endpoints later
27+
func (p *EndpointPusher) Register(ch chan []string) {
28+
p.chans = append(p.chans, ch)
29+
}
30+
31+
// Push pushes endpoint candicates
32+
func (p *EndpointPusher) Push(endpoints []string) {
33+
p.delOutdated(endpoints)
34+
p.addCheck(endpoints)
35+
}
36+
37+
func (p *EndpointPusher) delOutdated(endpoints []string) {
38+
newEps := make(map[string]struct{})
39+
for _, e := range endpoints {
40+
newEps[e] = struct{}{}
41+
}
42+
43+
p.pendingEndpoints.Range(func(key, value interface{}) bool {
44+
ep, ok := key.(string)
45+
if !ok {
46+
log.Error("[EruResolver] failed to cast key while ranging pendingEndpoints")
47+
return true
48+
}
49+
cancel, ok := value.(context.CancelFunc)
50+
if !ok {
51+
log.Error("[EruResolver] failed to cast value while ranging pendingEndpoints")
52+
}
53+
if _, ok := newEps[ep]; !ok {
54+
cancel()
55+
p.pendingEndpoints.Delete(ep)
56+
log.Debugf("[EruResolver] pending endpoint deleted: %s", ep)
57+
}
58+
return true
59+
})
60+
61+
p.availableEndpoints.Range(func(key, _ interface{}) bool {
62+
ep, ok := key.(string)
63+
if !ok {
64+
log.Error("[EruResolver] failed to cast key while ranging availableEndpoints")
65+
return true
66+
}
67+
if _, ok := newEps[ep]; !ok {
68+
p.availableEndpoints.Delete(ep)
69+
log.Debugf("[EruResolver] available endpoint deleted: %s", ep)
70+
}
71+
return true
72+
})
73+
}
74+
75+
func (p *EndpointPusher) addCheck(endpoints []string) {
76+
for _, endpoint := range endpoints {
77+
if _, ok := p.pendingEndpoints.Load(endpoint); ok {
78+
continue
79+
}
80+
if _, ok := p.availableEndpoints.Load(endpoint); ok {
81+
continue
82+
}
83+
84+
ctx, cancel := context.WithCancel(context.Background())
85+
p.pendingEndpoints.Store(endpoint, cancel)
86+
go p.pollReachability(ctx, endpoint)
87+
log.Debugf("[EruResolver] pending endpoint added: %s", endpoint)
88+
}
89+
}
90+
func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string) {
91+
parts := strings.Split(endpoint, ":")
92+
if len(parts) != 2 {
93+
log.Errorf("[EruResolver] wrong format of endpoint: %s", endpoint)
94+
return
95+
}
96+
97+
for {
98+
select {
99+
case <-ctx.Done():
100+
log.Debugf("[EruResolver] reachability goroutine ends: %s", endpoint)
101+
return
102+
default:
103+
}
104+
105+
time.Sleep(time.Second)
106+
if err := p.checkReachability(parts[0]); err != nil {
107+
continue
108+
}
109+
110+
p.pendingEndpoints.Delete(endpoint)
111+
p.availableEndpoints.Store(endpoint, struct{}{})
112+
p.pushEndpoints()
113+
log.Debugf("[EruResolver] available endpoint added: %s", endpoint)
114+
return
115+
}
116+
}
117+
118+
func (p *EndpointPusher) checkReachability(host string) (err error) {
119+
pinger, err := ping.NewPinger(host)
120+
if err != nil {
121+
log.Errorf("[EruResolver] failed to create pinger: %+v", err)
122+
return
123+
}
124+
defer pinger.Stop()
125+
126+
pinger.Count = 1
127+
pinger.Timeout = time.Second
128+
if err = pinger.Run(); err != nil {
129+
return
130+
}
131+
if pinger.Statistics().PacketsRecv != 1 {
132+
return errors.New("icmp packet lost")
133+
}
134+
return
135+
}
136+
137+
func (p *EndpointPusher) pushEndpoints() {
138+
endpoints := []string{}
139+
p.availableEndpoints.Range(func(key, value interface{}) bool {
140+
endpoint, ok := key.(string)
141+
if !ok {
142+
log.Error("[EruResolver] failed to cast key while ranging availableEndpoints")
143+
return true
144+
}
145+
endpoints = append(endpoints, endpoint)
146+
return true
147+
})
148+
for _, ch := range p.chans {
149+
ch <- endpoints
150+
}
151+
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 // indirect
2121
github.com/getsentry/sentry-go v0.9.0
2222
github.com/go-git/go-git/v5 v5.2.0
23+
github.com/go-ping/ping v0.0.0-20210407214646-e4e642a95741
2324
github.com/go-redis/redis/v8 v8.8.0
2425
github.com/golang/protobuf v1.4.2
2526
github.com/google/uuid v1.1.1

go.sum

+3
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
151151
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
152152
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
153153
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
154+
github.com/go-ping/ping v0.0.0-20210407214646-e4e642a95741 h1:b0sLP++Tsle+s57tqg5sUk1/OQsC6yMCciVeqNzOcwU=
155+
github.com/go-ping/ping v0.0.0-20210407214646-e4e642a95741/go.mod h1:35JbSyV/BYqHwwRA6Zr1uVDm1637YlNOU61wI797NPI=
154156
github.com/go-redis/redis/v8 v8.1.0/go.mod h1:isLoQT/NFSP7V67lyvM9GmdvLdyZ7pEhsXvvyQtnQTo=
155157
github.com/go-redis/redis/v8 v8.8.0 h1:fDZP58UN/1RD3DjtTXP/fFZ04TFohSYhjZDkcDe2dnw=
156158
github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y=
@@ -540,6 +542,7 @@ golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLL
540542
golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
541543
golang.org/x/net v0.0.0-20200319234117-63522dbf7eec/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
542544
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
545+
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
543546
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
544547
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
545548
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=

0 commit comments

Comments
 (0)