Skip to content

Commit 6b3e03a

Browse files
jschwinger233CMGS
authored andcommitted
client: resolver updates svc addresses periodically
1 parent 6eb0177 commit 6b3e03a

File tree

4 files changed

+133
-0
lines changed

4 files changed

+133
-0
lines changed

client/resolver/eru/builder.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package eru
2+
3+
import "google.golang.org/grpc/resolver"
4+
5+
type eruResolverBuilder struct{}
6+
7+
func init() {
8+
resolver.Register(&eruResolverBuilder{})
9+
}
10+
11+
// Scheme for interface
12+
func (b *eruResolverBuilder) Scheme() string {
13+
return "eru"
14+
}
15+
16+
// Build for interface
17+
func (b *eruResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
18+
return New(cc, target.Endpoint), nil
19+
}

client/resolver/eru/resolver.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package eru
2+
3+
import (
4+
"context"
5+
6+
"github.com/projecteru2/core/client/service_discovery"
7+
log "github.com/sirupsen/logrus"
8+
"google.golang.org/grpc/resolver"
9+
)
10+
11+
type eruResolver struct {
12+
cc resolver.ClientConn
13+
cancel context.CancelFunc
14+
discovery service_discovery.ServiceDiscovery
15+
}
16+
17+
func New(cc resolver.ClientConn, endpoint string) *eruResolver {
18+
r := &eruResolver{
19+
cc: cc,
20+
discovery: service_discovery.New(endpoint),
21+
}
22+
go r.sync()
23+
return r
24+
}
25+
26+
// ResolveNow for interface
27+
func (r *eruResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
28+
29+
// Close for interface
30+
func (r *eruResolver) Close() {
31+
r.cancel()
32+
}
33+
34+
func (r *eruResolver) sync() {
35+
log.Info("[eruResolver] start sync service discovery")
36+
ctx, cancel := context.WithCancel(context.Background())
37+
r.cancel = cancel
38+
defer cancel()
39+
40+
ch, err := r.discovery.Watch(ctx)
41+
if err != nil {
42+
log.Errorf("[eruResolver] failed to watch service status: %v", err)
43+
return
44+
}
45+
for {
46+
select {
47+
case <-ctx.Done():
48+
log.Errorf("[eruResolver] watch interrupted: %v", ctx.Err())
49+
break
50+
case endpoints, ok := <-ch:
51+
if !ok {
52+
log.Error("[eruResolver] watch closed")
53+
break
54+
}
55+
56+
var addresses []resolver.Address
57+
log.Debugf("[eruResolver] update state: %v", endpoints)
58+
for _, ep := range endpoints {
59+
addresses = append(addresses, resolver.Address{Addr: ep})
60+
}
61+
r.cc.UpdateState(resolver.State{Addresses: addresses})
62+
}
63+
}
64+
65+
}

client/resolver/static/builder.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package static
2+
3+
import "google.golang.org/grpc/resolver"
4+
5+
type staticResolverBuilder struct{}
6+
7+
func init() {
8+
resolver.Register(&staticResolverBuilder{})
9+
}
10+
11+
// Scheme for interface
12+
func (b *staticResolverBuilder) Scheme() string {
13+
return "static"
14+
}
15+
16+
// Build for interface
17+
func (b *staticResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
18+
return New(cc, target.Endpoint), nil
19+
}

client/resolver/static/resolver.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package static
2+
3+
import (
4+
"strings"
5+
6+
"google.golang.org/grpc/resolver"
7+
)
8+
9+
type staticResolver struct {
10+
addresses []resolver.Address
11+
cc resolver.ClientConn
12+
}
13+
14+
func New(cc resolver.ClientConn, endpoints string) *staticResolver {
15+
var addresses []resolver.Address
16+
for _, ep := range strings.Split(endpoints, ",") {
17+
addresses = append(addresses, resolver.Address{Addr: ep})
18+
}
19+
cc.UpdateState(resolver.State{Addresses: addresses})
20+
return &staticResolver{
21+
cc: cc,
22+
addresses: addresses,
23+
}
24+
}
25+
26+
// ResolveNow for interface
27+
func (r *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
28+
29+
// Close for interface
30+
func (r *staticResolver) Close() {}

0 commit comments

Comments
 (0)