Skip to content

Commit cbde8d3

Browse files
committed
refactor service discovery (#251)
1 parent 30eb518 commit cbde8d3

File tree

8 files changed

+122
-83
lines changed

8 files changed

+122
-83
lines changed

README.md

+7-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ Eru
44
![](https://github.com/projecteru2/core/workflows/golangci-lint/badge.svg)
55
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/69918e0a02ae45c5ae7dfc42bad5cfe5)](https://www.codacy.com/gh/projecteru2/core?utm_source=github.com&utm_medium=referral&utm_content=projecteru2/core&utm_campaign=Badge_Grade)
66

7-
Eru is a stateless, flexible, production-ready cluster scheduler designed to easily integrate into existing workflows. Eru can run any containerized things in long or short time. This project is Eru Core. The Core use for resource allocation and manage containers lifetime.
7+
Eru is a stateless, flexible, production-ready resource scheduler designed to easily integrate into existing systems.
8+
9+
Eru can use multiple engines to run anything for the long or short term.
10+
11+
This project is Eru Core. The Core use for resource allocation and manage resource's lifetime.
812

913
### Testing
1014

@@ -23,11 +27,10 @@ You can use our [footstone](https://hub.docker.com/r/projecteru2/footstone/) ima
2327

2428
#### GRPC
2529

26-
Generate golang & python 3 code
30+
Generate golang grpc definitions.
2731

2832
```shell
2933
go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
30-
pip install -U grpcio-tools
3134
make grpc
3235
```
3336

@@ -60,7 +63,7 @@ docker run -d \
6063

6164
### Build and Deploy by Eru itself
6265

63-
After we implemented bootstrap in eru2, now you can build and deploy eru with [cli](https://github.com/projecteru2/cli) tool.
66+
After we implemented bootstrap in eru, now you can build and deploy eru with [cli](https://github.com/projecteru2/cli) tool.
6467

6568
1. Test source code and build image
6669

cluster/calcium/calcium.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"strings"
55

66
"github.com/projecteru2/core/cluster"
7+
"github.com/projecteru2/core/discovery"
8+
"github.com/projecteru2/core/discovery/helium"
79
"github.com/projecteru2/core/scheduler"
810
complexscheduler "github.com/projecteru2/core/scheduler/complex"
911
"github.com/projecteru2/core/source"
@@ -21,7 +23,7 @@ type Calcium struct {
2123
store store.Store
2224
scheduler scheduler.Scheduler
2325
source source.Source
24-
watcher *serviceWatcher
26+
watcher discovery.Service
2527
}
2628

2729
// New returns a new cluster config
@@ -50,7 +52,10 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
5052
log.Warn("[Calcium] SCM not set, build API disabled")
5153
}
5254

53-
return &Calcium{store: store, config: config, scheduler: scheduler, source: scm, watcher: &serviceWatcher{}}, err
55+
// set watcher
56+
watcher := helium.New(config.GRPCConfig, store)
57+
58+
return &Calcium{store: store, config: config, scheduler: scheduler, source: scm, watcher: watcher}, err
5459
}
5560

5661
// Finalizer use for defer

cluster/calcium/execute.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
enginetypes "github.com/projecteru2/core/engine/types"
88
"github.com/projecteru2/core/types"
9-
"github.com/projecteru2/core/utils"
109
log "github.com/sirupsen/logrus"
1110
)
1211

@@ -66,7 +65,8 @@ func (c *Calcium) ExecuteContainer(ctx context.Context, opts *types.ExecuteConta
6665

6766
exitData := []byte(exitDataPrefix + strconv.Itoa(execCode))
6867
ch <- &types.AttachContainerMessage{ContainerID: opts.ContainerID, Data: exitData}
69-
log.Infof("[ExecuteContainer] Execuate in container %s complete", utils.ShortID(opts.ContainerID))
68+
log.Infof("[ExecuteContainer] Execuate in container %s complete", opts.ContainerID)
69+
log.Infof("[ExecuteContainer] %v", opts.Commands)
7070
}()
7171

7272
return ch

cluster/calcium/service.go

-72
Original file line numberDiff line numberDiff line change
@@ -5,86 +5,14 @@ import (
55
"sync"
66
"time"
77

8-
"github.com/google/uuid"
9-
"github.com/projecteru2/core/store"
108
"github.com/projecteru2/core/types"
119
"github.com/projecteru2/core/utils"
1210
log "github.com/sirupsen/logrus"
1311
)
1412

15-
type serviceWatcher struct {
16-
once sync.Once
17-
subs sync.Map
18-
}
19-
20-
func (w *serviceWatcher) Start(ctx context.Context, s store.Store, pushInterval time.Duration) {
21-
w.once.Do(func() {
22-
w.start(ctx, s, pushInterval)
23-
})
24-
}
25-
26-
func (w *serviceWatcher) start(ctx context.Context, s store.Store, pushInterval time.Duration) {
27-
ch, err := s.ServiceStatusStream(ctx)
28-
if err != nil {
29-
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
30-
return
31-
}
32-
33-
go func() {
34-
defer log.Error("[WatchServiceStatus] goroutine exited")
35-
var (
36-
latestStatus types.ServiceStatus
37-
timer *time.Timer = time.NewTimer(pushInterval)
38-
)
39-
for {
40-
select {
41-
case addresses, ok := <-ch:
42-
if !ok {
43-
log.Error("[WatchServiceStatus] watch channel closed")
44-
return
45-
}
46-
47-
latestStatus = types.ServiceStatus{
48-
Addresses: addresses,
49-
Interval: pushInterval * 2,
50-
}
51-
w.dispatch(latestStatus)
52-
53-
case <-timer.C:
54-
w.dispatch(latestStatus)
55-
}
56-
timer.Stop()
57-
timer.Reset(pushInterval)
58-
}
59-
}()
60-
}
61-
62-
func (w *serviceWatcher) dispatch(status types.ServiceStatus) {
63-
w.subs.Range(func(k, v interface{}) bool {
64-
c, ok := v.(chan<- types.ServiceStatus)
65-
if !ok {
66-
log.Error("[WatchServiceStatus] failed to cast channel from map")
67-
return true
68-
}
69-
c <- status
70-
return true
71-
})
72-
}
73-
74-
func (w *serviceWatcher) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
75-
id := uuid.New()
76-
_, _ = w.subs.LoadOrStore(id, ch)
77-
return id
78-
}
79-
80-
func (w *serviceWatcher) Unsubscribe(id uuid.UUID) {
81-
w.subs.Delete(id)
82-
}
83-
8413
// WatchServiceStatus returns chan of available service address
8514
func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) {
8615
ch := make(chan types.ServiceStatus)
87-
c.watcher.Start(ctx, c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval)
8816
id := c.watcher.Subscribe(ch)
8917
go func() {
9018
<-ctx.Done()

cluster/calcium/service_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/projecteru2/core/discovery/helium"
1011
storemocks "github.com/projecteru2/core/store/mocks"
1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/mock"
@@ -59,8 +60,6 @@ func TestWatchServiceStatus(t *testing.T) {
5960
c.config.GRPCConfig.ServiceDiscoveryPushInterval = 500 * time.Millisecond
6061
store := &storemocks.Store{}
6162
c.store = store
62-
c.watcher = &serviceWatcher{}
63-
6463
store.On("ServiceStatusStream", mock.AnythingOfType("*context.emptyCtx")).Return(
6564
func(_ context.Context) chan []string {
6665
ch := make(chan []string)
@@ -78,6 +77,7 @@ func TestWatchServiceStatus(t *testing.T) {
7877
return ch
7978
}, nil,
8079
)
80+
c.watcher = helium.New(c.config.GRPCConfig, c.store)
8181

8282
ch, err := c.WatchServiceStatus(context.Background())
8383
assert.NoError(t, err)

discovery/discovery.go

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package discovery
2+
3+
import (
4+
"github.com/google/uuid"
5+
"github.com/projecteru2/core/types"
6+
)
7+
8+
// Service .
9+
type Service interface {
10+
Subscribe(ch chan<- types.ServiceStatus) uuid.UUID
11+
Unsubscribe(id uuid.UUID)
12+
}

discovery/helium/helium.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package helium
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/projecteru2/core/store"
10+
"github.com/projecteru2/core/types"
11+
log "github.com/sirupsen/logrus"
12+
)
13+
14+
// Helium .
15+
type Helium struct {
16+
sync.Once
17+
config types.GRPCConfig
18+
stor store.Store
19+
subs sync.Map
20+
}
21+
22+
// New .
23+
func New(config types.GRPCConfig, stor store.Store) *Helium {
24+
h := &Helium{}
25+
h.config = config
26+
h.stor = stor
27+
h.Do(func() {
28+
h.start(context.Background()) // rewrite ctx here, because this will run only once!
29+
})
30+
return h
31+
}
32+
33+
// Subscribe .
34+
func (h *Helium) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
35+
id := uuid.New()
36+
_, _ = h.subs.LoadOrStore(id, ch)
37+
return id
38+
}
39+
40+
// Unsubscribe .
41+
func (h *Helium) Unsubscribe(id uuid.UUID) {
42+
h.subs.Delete(id)
43+
}
44+
45+
func (h *Helium) start(ctx context.Context) {
46+
ch, err := h.stor.ServiceStatusStream(ctx)
47+
if err != nil {
48+
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
49+
return
50+
}
51+
52+
go func() {
53+
log.Info("[WatchServiceStatus] service discovery start")
54+
defer log.Error("[WatchServiceStatus] service discovery exited")
55+
var (
56+
latestStatus types.ServiceStatus
57+
timer *time.Timer = time.NewTimer(h.config.ServiceDiscoveryPushInterval)
58+
)
59+
for {
60+
select {
61+
case addresses, ok := <-ch:
62+
if !ok {
63+
log.Error("[WatchServiceStatus] watch channel closed")
64+
return
65+
}
66+
67+
latestStatus = types.ServiceStatus{
68+
Addresses: addresses,
69+
Interval: h.config.ServiceDiscoveryPushInterval * 2,
70+
}
71+
h.dispatch(latestStatus)
72+
73+
case <-timer.C:
74+
h.dispatch(latestStatus)
75+
}
76+
timer.Stop()
77+
timer.Reset(h.config.ServiceDiscoveryPushInterval)
78+
}
79+
}()
80+
}
81+
82+
func (h *Helium) dispatch(status types.ServiceStatus) {
83+
h.subs.Range(func(k, v interface{}) bool {
84+
c, ok := v.(chan<- types.ServiceStatus)
85+
if !ok {
86+
log.Error("[WatchServiceStatus] failed to cast channel from map")
87+
return true
88+
}
89+
c <- status
90+
return true
91+
})
92+
}

store/etcdv3/service.go

-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error
4141
ch := make(chan []string)
4242
go func() {
4343
defer close(ch)
44-
log.Info("[ServiceStatusStream] start watching service status")
4544
resp, err := m.Get(ctx, fmt.Sprintf(serviceStatusKey, ""), clientv3.WithPrefix())
4645
if err != nil {
4746
log.Errorf("[ServiceStatusStream] failed to get current services: %v", err)

0 commit comments

Comments
 (0)