Skip to content

Commit e6b5d62

Browse files
jschwinger233CMGS
authored andcommitted
client: service discovery can watch core with auth
1 parent c1c82a6 commit e6b5d62

File tree

12 files changed

+113
-82
lines changed

12 files changed

+113
-82
lines changed

client/client.go

+11-17
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ import (
44
"context"
55
"time"
66

7-
log "github.com/sirupsen/logrus"
8-
97
"github.com/projecteru2/core/auth"
108
"github.com/projecteru2/core/client/interceptor"
11-
_ "github.com/projecteru2/core/client/resolver/eru"
12-
_ "github.com/projecteru2/core/client/resolver/static"
9+
_ "github.com/projecteru2/core/client/resolver/eru" // register grpc resolver: eru://
10+
_ "github.com/projecteru2/core/client/resolver/static" // register grpc resolver: static://
1311
pb "github.com/projecteru2/core/rpc/gen"
1412
"github.com/projecteru2/core/types"
1513
"github.com/projecteru2/core/utils"
@@ -24,12 +22,12 @@ type Client struct {
2422
}
2523

2624
// NewClient new a client
27-
func NewClient(ctx context.Context, addr string, authConfig types.AuthConfig) *Client {
28-
client := &Client{
25+
func NewClient(ctx context.Context, addr string, authConfig types.AuthConfig) (*Client, error) {
26+
cc, err := dial(ctx, addr, authConfig)
27+
return &Client{
2928
addr: addr,
30-
conn: dial(ctx, addr, authConfig),
31-
}
32-
return client
29+
conn: cc,
30+
}, err
3331
}
3432

3533
// GetConn return connection
@@ -42,22 +40,18 @@ func (c *Client) GetRPCClient() pb.CoreRPCClient {
4240
return pb.NewCoreRPCClient(c.conn)
4341
}
4442

45-
func dial(ctx context.Context, addr string, authConfig types.AuthConfig) *grpc.ClientConn {
43+
func dial(ctx context.Context, addr string, authConfig types.AuthConfig) (*grpc.ClientConn, error) {
4644
opts := []grpc.DialOption{
4745
grpc.WithInsecure(),
4846
grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: 6 * 60 * time.Second, Timeout: time.Second}),
49-
grpc.WithBalancerName("round_robin"),
47+
grpc.WithBalancerName("round_robin"), // nolint:staticcheck
5048
grpc.WithUnaryInterceptor(interceptor.NewUnaryRetry(interceptor.RetryOptions{Max: 1})),
5149
grpc.WithStreamInterceptor(interceptor.NewStreamRetry(interceptor.RetryOptions{Max: 1})),
5250
}
5351
if authConfig.Username != "" {
5452
opts = append(opts, grpc.WithPerRPCCredentials(auth.NewCredential(authConfig)))
5553
}
5654

57-
target := utils.MakeTarget(addr)
58-
cc, err := grpc.DialContext(ctx, target, opts...)
59-
if err != nil {
60-
log.Panicf("[NewClient] failed to dial grpc %s: %v", addr, err)
61-
}
62-
return cc
55+
target := utils.MakeTarget(addr, authConfig)
56+
return grpc.DialContext(ctx, target, opts...)
6357
}

client/interceptor/retry.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"google.golang.org/grpc"
1111
)
1212

13+
// NewUnaryRetry makes unary RPC retry on error
1314
func NewUnaryRetry(retryOpts RetryOptions) grpc.UnaryClientInterceptor {
1415
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1516
return backoff.Retry(func() error {
@@ -18,11 +19,13 @@ func NewUnaryRetry(retryOpts RetryOptions) grpc.UnaryClientInterceptor {
1819
}
1920
}
2021

22+
// RPCNeedRetry records rpc stream methods to retry
2123
var RPCNeedRetry = map[string]struct{}{
22-
"/pb.CoreRPC/ContainerStatusStream": struct{}{},
23-
"/pb.CoreRPC/WatchServiceStatus": struct{}{},
24+
"/pb.CoreRPC/ContainerStatusStream": {},
25+
"/pb.CoreRPC/WatchServiceStatus": {},
2426
}
2527

28+
// NewStreamRetry make specific stream retry on error
2629
func NewStreamRetry(retryOpts RetryOptions) grpc.StreamClientInterceptor {
2730
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
2831
stream, err := streamer(ctx, desc, cc, method, opts...)

client/interceptor/types.go

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"google.golang.org/grpc"
88
)
99

10+
// RetryOptions .
1011
type RetryOptions struct {
1112
Max int
1213
}

client/resolver/eru/builder.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import "google.golang.org/grpc/resolver"
44

55
type eruResolverBuilder struct{}
66

7-
func init() {
7+
func init() { // nolint
88
resolver.Register(&eruResolverBuilder{})
99
}
1010

@@ -15,5 +15,5 @@ func (b *eruResolverBuilder) Scheme() string {
1515

1616
// Build for interface
1717
func (b *eruResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
18-
return New(cc, target.Endpoint), nil
18+
return New(cc, target.Endpoint, target.Authority), nil
1919
}

client/resolver/eru/resolver.go

+26-16
Original file line numberDiff line numberDiff line change
@@ -2,59 +2,69 @@ package eru
22

33
import (
44
"context"
5+
"strings"
56

6-
"github.com/projecteru2/core/client/service_discovery"
7+
"github.com/projecteru2/core/client/servicediscovery"
8+
"github.com/projecteru2/core/types"
79
log "github.com/sirupsen/logrus"
810
"google.golang.org/grpc/resolver"
911
)
1012

11-
type eruResolver struct {
13+
// Resolver for target eru://{addr}
14+
type Resolver struct {
1215
cc resolver.ClientConn
1316
cancel context.CancelFunc
14-
discovery service_discovery.ServiceDiscovery
17+
discovery servicediscovery.ServiceDiscovery
1518
}
1619

17-
func New(cc resolver.ClientConn, endpoint string) *eruResolver {
18-
r := &eruResolver{
20+
// New Resolver
21+
func New(cc resolver.ClientConn, endpoint string, authority string) *Resolver {
22+
var username, password string
23+
if authority != "" {
24+
parts := strings.Split(authority, ":")
25+
username, password = strings.TrimLeft(parts[0], "@"), parts[1]
26+
}
27+
authConfig := types.AuthConfig{Username: username, Password: password}
28+
r := &Resolver{
1929
cc: cc,
20-
discovery: service_discovery.New(endpoint),
30+
discovery: servicediscovery.New(endpoint, authConfig),
2131
}
2232
go r.sync()
2333
return r
2434
}
2535

2636
// ResolveNow for interface
27-
func (r *eruResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
37+
func (r *Resolver) ResolveNow(_ resolver.ResolveNowOptions) {}
2838

2939
// Close for interface
30-
func (r *eruResolver) Close() {
40+
func (r *Resolver) Close() {
3141
r.cancel()
3242
}
3343

34-
func (r *eruResolver) sync() {
35-
log.Info("[eruResolver] start sync service discovery")
44+
func (r *Resolver) sync() {
45+
log.Info("[EruResolver] start sync service discovery")
3646
ctx, cancel := context.WithCancel(context.Background())
3747
r.cancel = cancel
3848
defer cancel()
3949

4050
ch, err := r.discovery.Watch(ctx)
4151
if err != nil {
42-
log.Errorf("[eruResolver] failed to watch service status: %v", err)
52+
log.Errorf("[EruResolver] failed to watch service status: %v", err)
4353
return
4454
}
4555
for {
4656
select {
4757
case <-ctx.Done():
48-
log.Errorf("[eruResolver] watch interrupted: %v", ctx.Err())
49-
break
58+
log.Errorf("[EruResolver] watch interrupted: %v", ctx.Err())
59+
return
5060
case endpoints, ok := <-ch:
5161
if !ok {
52-
log.Error("[eruResolver] watch closed")
53-
break
62+
log.Error("[EruResolver] watch closed")
63+
return
5464
}
5565

5666
var addresses []resolver.Address
57-
log.Debugf("[eruResolver] update state: %v", endpoints)
67+
log.Debugf("[EruResolver] update state: %v", endpoints)
5868
for _, ep := range endpoints {
5969
addresses = append(addresses, resolver.Address{Addr: ep})
6070
}

client/resolver/static/builder.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import "google.golang.org/grpc/resolver"
44

55
type staticResolverBuilder struct{}
66

7-
func init() {
7+
func init() { // nolint
88
resolver.Register(&staticResolverBuilder{})
99
}
1010

client/resolver/static/resolver.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,27 @@ import (
66
"google.golang.org/grpc/resolver"
77
)
88

9-
type staticResolver struct {
9+
// Resolver for target static://{addr1},{addr2},{addr3}
10+
type Resolver struct {
1011
addresses []resolver.Address
1112
cc resolver.ClientConn
1213
}
1314

14-
func New(cc resolver.ClientConn, endpoints string) *staticResolver {
15+
// New Resolver
16+
func New(cc resolver.ClientConn, endpoints string) *Resolver {
1517
var addresses []resolver.Address
1618
for _, ep := range strings.Split(endpoints, ",") {
1719
addresses = append(addresses, resolver.Address{Addr: ep})
1820
}
1921
cc.UpdateState(resolver.State{Addresses: addresses})
20-
return &staticResolver{
22+
return &Resolver{
2123
cc: cc,
2224
addresses: addresses,
2325
}
2426
}
2527

2628
// ResolveNow for interface
27-
func (r *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
29+
func (r *Resolver) ResolveNow(_ resolver.ResolveNowOptions) {}
2830

2931
// Close for interface
30-
func (r *staticResolver) Close() {}
32+
func (r *Resolver) Close() {}

client/servicediscovery/builder.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package servicediscovery
2+
3+
import "google.golang.org/grpc/resolver"
4+
5+
// LBResolverBuilder for service discovery lb
6+
type LBResolverBuilder struct {
7+
updateCh chan []string
8+
}
9+
10+
var lbResolverBuilder *LBResolverBuilder
11+
12+
func init() { // nolint
13+
lbResolverBuilder = &LBResolverBuilder{
14+
updateCh: make(chan []string),
15+
}
16+
resolver.Register(lbResolverBuilder)
17+
}
18+
19+
// Scheme for interface
20+
func (b *LBResolverBuilder) Scheme() string {
21+
return "lb"
22+
}
23+
24+
// Build for interface
25+
func (b *LBResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
26+
return newLBResolver(cc, target.Endpoint, b.updateCh), nil
27+
}

client/service_discovery/eru_service_discovery.go renamed to client/servicediscovery/eru_service_discovery.go

+24-13
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,36 @@
1-
package service_discovery
1+
package servicediscovery
22

33
import (
44
"context"
55
"fmt"
66
"math"
77
"time"
88

9+
"github.com/projecteru2/core/auth"
910
"github.com/projecteru2/core/client/interceptor"
1011
pb "github.com/projecteru2/core/rpc/gen"
12+
"github.com/projecteru2/core/types"
1113
log "github.com/sirupsen/logrus"
1214
"google.golang.org/grpc"
1315
)
1416

15-
type eruServiceDiscovery struct {
16-
endpoint string
17+
// EruServiceDiscovery watches eru service status
18+
type EruServiceDiscovery struct {
19+
endpoint string
20+
authConfig types.AuthConfig
1721
}
1822

19-
func New(endpoint string) *eruServiceDiscovery {
20-
return &eruServiceDiscovery{
21-
endpoint: endpoint,
23+
// New EruServiceDiscovery
24+
func New(endpoint string, authConfig types.AuthConfig) *EruServiceDiscovery {
25+
return &EruServiceDiscovery{
26+
endpoint: endpoint,
27+
authConfig: authConfig,
2228
}
2329
}
2430

25-
func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) {
26-
cc, err := w.dial(ctx, w.endpoint)
31+
// Watch .
32+
func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) {
33+
cc, err := w.dial(ctx, w.endpoint, w.authConfig)
2734
if err != nil {
2835
log.Errorf("[EruServiceWatch] dial failed: %v", err)
2936
return
@@ -36,8 +43,9 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
3643
watchCtx, cancelWatch := context.WithCancel(ctx)
3744
stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{})
3845
if err != nil {
39-
log.Errorf("[EruServiceWatch] watch failed: %v", err)
40-
return
46+
log.Errorf("[EruServiceWatch] watch failed, try later: %v", err)
47+
time.Sleep(10 * time.Second)
48+
continue
4149
}
4250
expectedInterval := time.Duration(math.MaxInt64) / time.Second
4351

@@ -52,7 +60,6 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
5260
case <-cancelTimer:
5361
return
5462
}
55-
5663
}()
5764
status, err := stream.Recv()
5865
close(cancelTimer)
@@ -70,13 +77,17 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
7077
return ch, nil
7178
}
7279

73-
func (w *eruServiceDiscovery) dial(ctx context.Context, addr string) (*grpc.ClientConn, error) {
80+
func (w *EruServiceDiscovery) dial(ctx context.Context, addr string, authConfig types.AuthConfig) (*grpc.ClientConn, error) {
7481
opts := []grpc.DialOption{
7582
grpc.WithInsecure(),
76-
grpc.WithBalancerName("round_robin"),
83+
grpc.WithBalancerName("round_robin"), // nolint:staticcheck
7784
grpc.WithStreamInterceptor(interceptor.NewStreamRetry(interceptor.RetryOptions{Max: 1})),
7885
}
7986

87+
if authConfig.Username != "" {
88+
opts = append(opts, grpc.WithPerRPCCredentials(auth.NewCredential(authConfig)))
89+
}
90+
8091
target := makeServiceDiscoveryTarget(addr)
8192
return grpc.DialContext(ctx, target, opts...)
8293
}

client/service_discovery/resolver.go renamed to client/servicediscovery/resolver.go

+1-22
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,11 @@
1-
package service_discovery
1+
package servicediscovery
22

33
import (
44
log "github.com/sirupsen/logrus"
55

66
"google.golang.org/grpc/resolver"
77
)
88

9-
type LBResolverBuilder struct {
10-
updateCh chan []string
11-
}
12-
13-
var lbResolverBuilder *LBResolverBuilder
14-
15-
func init() {
16-
lbResolverBuilder = &LBResolverBuilder{
17-
updateCh: make(chan []string),
18-
}
19-
resolver.Register(lbResolverBuilder)
20-
}
21-
22-
func (b *LBResolverBuilder) Scheme() string {
23-
return "lb"
24-
}
25-
26-
func (b *LBResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
27-
return newLBResolver(cc, target.Endpoint, b.updateCh), nil
28-
}
29-
309
type lbResolver struct {
3110
cc resolver.ClientConn
3211
}
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
package service_discovery
1+
package servicediscovery
22

33
import "context"
44

5+
// ServiceDiscovery notifies current core service addresses
56
type ServiceDiscovery interface {
67
Watch(context.Context) (<-chan []string, error)
78
}

0 commit comments

Comments
 (0)