Skip to content

Commit da4e6ef

Browse files
jschwinger233CMGS
authored andcommitted
cluster: WatchServiceStatus, RegisterService
1 parent 117b5f7 commit da4e6ef

File tree

10 files changed

+215
-12
lines changed

10 files changed

+215
-12
lines changed

cluster/calcium/calcium.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package calcium
22

33
import (
4+
"context"
45
"strings"
56

67
"github.com/projecteru2/core/cluster"
@@ -21,6 +22,9 @@ type Calcium struct {
2122
store store.Store
2223
scheduler scheduler.Scheduler
2324
source source.Source
25+
watcher *serviceWatcher
26+
27+
cancelServiceHeartbeat context.CancelFunc
2428
}
2529

2630
// New returns a new cluster config
@@ -49,7 +53,7 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
4953
log.Warn("[Calcium] SCM not set, build API disabled")
5054
}
5155

52-
return &Calcium{store: store, config: config, scheduler: scheduler, source: scm}, err
56+
return &Calcium{store: store, config: config, scheduler: scheduler, source: scm, watcher: &serviceWatcher{}}, err
5357
}
5458

5559
// Finalizer use for defer

cluster/calcium/service.go

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package calcium
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/projecteru2/core/store"
10+
"github.com/projecteru2/core/types"
11+
"github.com/projecteru2/core/utils"
12+
log "github.com/sirupsen/logrus"
13+
)
14+
15+
type serviceWatcher struct {
16+
once sync.Once
17+
subs sync.Map
18+
}
19+
20+
func (w *serviceWatcher) Start(s store.Store, pushInterval time.Duration) {
21+
w.once.Do(func() {
22+
w.start(s, pushInterval)
23+
})
24+
}
25+
26+
func (w *serviceWatcher) start(s store.Store, pushInterval time.Duration) {
27+
ch, err := s.ServiceStatusStream(context.Background())
28+
if err != nil {
29+
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
30+
return
31+
}
32+
33+
go func() {
34+
defer log.Error("[WatchServiceStatus] goroutine exited")
35+
var (
36+
latestStatus types.ServiceStatus
37+
timer *time.Timer = time.NewTimer(pushInterval / 2)
38+
)
39+
for {
40+
select {
41+
case addresses, ok := <-ch:
42+
if !ok {
43+
log.Error("[WatchServiceStatus] watch channel closed")
44+
return
45+
}
46+
47+
latestStatus = types.ServiceStatus{
48+
Addresses: addresses,
49+
Interval: pushInterval,
50+
}
51+
w.dispatch(latestStatus)
52+
53+
case <-timer.C:
54+
w.dispatch(latestStatus)
55+
}
56+
timer.Stop()
57+
timer.Reset(pushInterval / 2)
58+
}
59+
}()
60+
}
61+
62+
func (w *serviceWatcher) dispatch(status types.ServiceStatus) {
63+
w.subs.Range(func(k, v interface{}) bool {
64+
c, ok := v.(chan<- types.ServiceStatus)
65+
if !ok {
66+
log.Error("[WatchServiceStatus] failed to cast channel from map")
67+
return true
68+
}
69+
c <- status
70+
return true
71+
})
72+
}
73+
74+
func (w *serviceWatcher) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
75+
id := uuid.New()
76+
_, _ = w.subs.LoadOrStore(id, ch)
77+
return id
78+
}
79+
80+
func (w *serviceWatcher) Unsubscribe(id uuid.UUID) {
81+
w.subs.Delete(id)
82+
}
83+
84+
func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) {
85+
ch := make(chan types.ServiceStatus)
86+
c.watcher.Start(c.store, c.config.ServiceDiscoveryPushInterval)
87+
id := c.watcher.Subscribe(ch)
88+
go func() {
89+
<-ctx.Done()
90+
c.watcher.Unsubscribe(id)
91+
close(ch)
92+
}()
93+
return ch, nil
94+
}
95+
96+
func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err error) {
97+
ctx, cancel := context.WithCancel(ctx)
98+
serviceAddress, err := utils.GetOutboundAddress(c.config.Bind)
99+
if err != nil {
100+
log.Errorf("[RegisterService] failed to get outbound address: %v", err)
101+
return
102+
}
103+
if err = c.store.RegisterService(ctx, serviceAddress, c.config.ServiceHeartbeatInterval); err != nil {
104+
log.Errorf("[RegisterService] failed to register service: %v", err)
105+
return
106+
}
107+
108+
done := make(chan struct{})
109+
go func() {
110+
defer func() {
111+
if err := c.store.UnregisterService(context.Background(), serviceAddress); err != nil {
112+
log.Errorf("[RegisterService] failed to unregister service: %v", err)
113+
}
114+
close(done)
115+
}()
116+
117+
timer := time.NewTicker(c.config.ServiceHeartbeatInterval / 2)
118+
for {
119+
select {
120+
case <-timer.C:
121+
if err := c.store.RegisterService(ctx, serviceAddress, c.config.ServiceHeartbeatInterval); err != nil {
122+
log.Errorf("[RegisterService] failed to register service: %v", err)
123+
}
124+
case <-ctx.Done():
125+
log.Infof("[RegisterService] context done: %v", ctx.Err())
126+
return
127+
}
128+
}
129+
}()
130+
return func() {
131+
cancel()
132+
<-done
133+
}, err
134+
}

cluster/cluster.go

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ const (
4444

4545
// Cluster define all interface
4646
type Cluster interface {
47+
// meta service
48+
WatchServiceStatus(context.Context) (<-chan types.ServiceStatus, error)
4749
// meta networks
4850
ListNetworks(ctx context.Context, podname string, driver string) ([]*enginetypes.Network, error)
4951
ConnectNetwork(ctx context.Context, network, target, ipv4, ipv6 string) ([]string, error)

core.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
56
"net"
67
"net/http"
@@ -89,18 +90,23 @@ func serve() {
8990
pb.RegisterCoreRPCServer(grpcServer, vibranium)
9091
go func() {
9192
if err := grpcServer.Serve(s); err != nil {
92-
log.Fatalf("start grpc failed %v", err)
93+
log.Fatalf("[main] start grpc failed %v", err)
9394
}
9495
}()
9596
if config.Profile != "" {
9697
http.Handle("/metrics", metrics.Client.ResourceMiddleware(cluster)(promhttp.Handler()))
9798
go func() {
9899
if err := http.ListenAndServe(config.Profile, nil); err != nil {
99-
log.Errorf("start http failed %v", err)
100+
log.Errorf("[main] start http failed %v", err)
100101
}
101102
}()
102103
}
103104

105+
unregisterService, err := cluster.RegisterService(context.Background())
106+
if err != nil {
107+
log.Errorf("[main] failed to register service: %v", err)
108+
return
109+
}
104110
log.Info("[main] Cluster started successfully.")
105111

106112
// wait for unix signals and try to GracefulStop
@@ -109,6 +115,7 @@ func serve() {
109115
sig := <-sigs
110116
log.Infof("[main] Get signal %v.", sig)
111117
close(rpcch)
118+
unregisterService()
112119
grpcServer.GracefulStop()
113120
log.Info("[main] gRPC server gracefully stopped.")
114121

core.yaml.sample

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
log_level: "DEBUG"
22
bind: ":5001"
3+
service_address: 10.22.12.87:5001
34
statsd: "127.0.0.1:8125"
45
profile: ":12346"
56
global_timeout: 300s

rpc/rpc.go

+17
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,23 @@ func (v *Vibranium) Info(ctx context.Context, opts *pb.Empty) (*pb.CoreInfo, err
3939
}, nil
4040
}
4141

42+
// WatchServiceStatus pushes sibling services
43+
func (v *Vibranium) WatchServiceStatus(_ *pb.Empty, stream pb.CoreRPC_WatchServiceStatusServer) (err error) {
44+
ch, err := v.cluster.WatchServiceStatus(stream.Context())
45+
if err != nil {
46+
log.Errorf("[WatchServicesStatus] failed to create watch channel: %v", err)
47+
return err
48+
}
49+
for status := range ch {
50+
s := toRPCServiceStatus(status)
51+
if err = stream.Send(s); err != nil {
52+
v.logUnsentMessages("WatchServicesStatus", s)
53+
return err
54+
}
55+
}
56+
return nil
57+
}
58+
4259
// ListNetworks list networks for pod
4360
func (v *Vibranium) ListNetworks(ctx context.Context, opts *pb.ListNetworkOptions) (*pb.Networks, error) {
4461
networks, err := v.cluster.ListNetworks(ctx, opts.Podname, opts.Driver)

rpc/transform.go

+8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rpc
33
import (
44
"bytes"
55
"encoding/json"
6+
"time"
67

78
enginetypes "github.com/projecteru2/core/engine/types"
89
pb "github.com/projecteru2/core/rpc/gen"
@@ -12,6 +13,13 @@ import (
1213
"golang.org/x/net/context"
1314
)
1415

16+
func toRPCServiceStatus(status types.ServiceStatus) *pb.ServiceStatus {
17+
return &pb.ServiceStatus{
18+
Addresses: status.Addresses,
19+
IntervalInSecond: int64(status.Interval / time.Second),
20+
}
21+
}
22+
1523
func toRPCCPUMap(m types.CPUMap) map[string]int32 {
1624
cpu := make(map[string]int32)
1725
for label, value := range m {

types/config.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@ import (
66

77
// Config holds eru-core config
88
type Config struct {
9-
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
10-
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
11-
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
12-
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
13-
Statsd string `yaml:"statsd"` // statsd host and port
14-
Profile string `yaml:"profile"` // profile ip:port
15-
CertPath string `yaml:"cert_path"` // docker cert files path
16-
Auth AuthConfig `yaml:"auth"` // grpc auth
17-
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
9+
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
10+
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
11+
ServiceAddress string `yaml:"service_address" required:"true"`
12+
ServiceDiscoveryPushInterval time.Duration `yaml:"service_discovery_interval" required:"true" default:"10s"`
13+
ServiceHeartbeatInterval time.Duration `yaml:"service_heartbeat_interval" required:"true" default:"10s"`
14+
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
15+
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
16+
Statsd string `yaml:"statsd"` // statsd host and port
17+
Profile string `yaml:"profile"` // profile ip:port
18+
CertPath string `yaml:"cert_path"` // docker cert files path
19+
Auth AuthConfig `yaml:"auth"` // grpc auth
20+
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
1821

1922
Git GitConfig `yaml:"git"`
2023
Etcd EtcdConfig `yaml:"etcd"`

types/service.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package types
2+
3+
import "time"
4+
5+
type ServiceStatus struct {
6+
Addresses []string
7+
Interval time.Duration
8+
}

utils/service.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package utils
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"strings"
7+
)
8+
9+
func GetOutboundAddress(bind string) (string, error) {
10+
conn, err := net.Dial("udp", "8.8.8.8:80")
11+
if err != nil {
12+
return "", err
13+
}
14+
defer conn.Close()
15+
16+
localAddr := conn.LocalAddr().(*net.UDPAddr)
17+
port := strings.Split(bind, ":")[1]
18+
return fmt.Sprintf("%s:%s", localAddr.IP, port), nil
19+
}

0 commit comments

Comments
 (0)