Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor service discovery #251

Merged
merged 1 commit into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"strings"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/discovery"
"github.com/projecteru2/core/discovery/helium"
"github.com/projecteru2/core/scheduler"
complexscheduler "github.com/projecteru2/core/scheduler/complex"
"github.com/projecteru2/core/source"
Expand All @@ -21,7 +23,7 @@ type Calcium struct {
store store.Store
scheduler scheduler.Scheduler
source source.Source
watcher *serviceWatcher
watcher discovery.Service
}

// New returns a new cluster config
Expand Down Expand Up @@ -50,7 +52,10 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
log.Warn("[Calcium] SCM not set, build API disabled")
}

return &Calcium{store: store, config: config, scheduler: scheduler, source: scm, watcher: &serviceWatcher{}}, err
// set watcher
watcher := helium.New(config.GRPCConfig, store)

return &Calcium{store: store, config: config, scheduler: scheduler, source: scm, watcher: watcher}, err
}

// Finalizer use for defer
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -66,7 +65,8 @@ func (c *Calcium) ExecuteContainer(ctx context.Context, opts *types.ExecuteConta

exitData := []byte(exitDataPrefix + strconv.Itoa(execCode))
ch <- &types.AttachContainerMessage{ContainerID: opts.ContainerID, Data: exitData}
log.Infof("[ExecuteContainer] Execuate in container %s complete", utils.ShortID(opts.ContainerID))
log.Infof("[ExecuteContainer] Execuate in container %s complete", opts.ContainerID)
log.Infof("[ExecuteContainer] %v", opts.Commands)
}()

return ch
Expand Down
72 changes: 0 additions & 72 deletions cluster/calcium/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,14 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

type serviceWatcher struct {
once sync.Once
subs sync.Map
}

func (w *serviceWatcher) Start(ctx context.Context, s store.Store, pushInterval time.Duration) {
w.once.Do(func() {
w.start(ctx, s, pushInterval)
})
}

func (w *serviceWatcher) start(ctx context.Context, s store.Store, pushInterval time.Duration) {
ch, err := s.ServiceStatusStream(ctx)
if err != nil {
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
return
}

go func() {
defer log.Error("[WatchServiceStatus] goroutine exited")
var (
latestStatus types.ServiceStatus
timer *time.Timer = time.NewTimer(pushInterval)
)
for {
select {
case addresses, ok := <-ch:
if !ok {
log.Error("[WatchServiceStatus] watch channel closed")
return
}

latestStatus = types.ServiceStatus{
Addresses: addresses,
Interval: pushInterval * 2,
}
w.dispatch(latestStatus)

case <-timer.C:
w.dispatch(latestStatus)
}
timer.Stop()
timer.Reset(pushInterval)
}
}()
}

func (w *serviceWatcher) dispatch(status types.ServiceStatus) {
w.subs.Range(func(k, v interface{}) bool {
c, ok := v.(chan<- types.ServiceStatus)
if !ok {
log.Error("[WatchServiceStatus] failed to cast channel from map")
return true
}
c <- status
return true
})
}

func (w *serviceWatcher) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
id := uuid.New()
_, _ = w.subs.LoadOrStore(id, ch)
return id
}

func (w *serviceWatcher) Unsubscribe(id uuid.UUID) {
w.subs.Delete(id)
}

// WatchServiceStatus returns chan of available service address
func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) {
ch := make(chan types.ServiceStatus)
c.watcher.Start(ctx, c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval)
id := c.watcher.Subscribe(ch)
go func() {
<-ctx.Done()
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/projecteru2/core/discovery/helium"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -59,8 +60,6 @@ func TestWatchServiceStatus(t *testing.T) {
c.config.GRPCConfig.ServiceDiscoveryPushInterval = 500 * time.Millisecond
store := &storemocks.Store{}
c.store = store
c.watcher = &serviceWatcher{}

store.On("ServiceStatusStream", mock.AnythingOfType("*context.emptyCtx")).Return(
func(_ context.Context) chan []string {
ch := make(chan []string)
Expand All @@ -78,6 +77,7 @@ func TestWatchServiceStatus(t *testing.T) {
return ch
}, nil,
)
c.watcher = helium.New(c.config.GRPCConfig, c.store)

ch, err := c.WatchServiceStatus(context.Background())
assert.NoError(t, err)
Expand Down
12 changes: 12 additions & 0 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package discovery

import (
"github.com/google/uuid"
"github.com/projecteru2/core/types"
)

// Service .
type Service interface {
Subscribe(ch chan<- types.ServiceStatus) uuid.UUID
Unsubscribe(id uuid.UUID)
}
92 changes: 92 additions & 0 deletions discovery/helium/helium.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package helium

import (
"context"
"sync"
"time"

"github.com/google/uuid"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)

// Helium .
type Helium struct {
sync.Once
config types.GRPCConfig
stor store.Store
subs sync.Map
}

// New .
func New(config types.GRPCConfig, stor store.Store) *Helium {
h := &Helium{}
h.config = config
h.stor = stor
h.Do(func() {
h.start(context.Background()) // rewrite ctx here, because this will run only once!
})
return h
}

// Subscribe .
func (h *Helium) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
id := uuid.New()
_, _ = h.subs.LoadOrStore(id, ch)
return id
}

// Unsubscribe .
func (h *Helium) Unsubscribe(id uuid.UUID) {
h.subs.Delete(id)
}

func (h *Helium) start(ctx context.Context) {
ch, err := h.stor.ServiceStatusStream(ctx)
if err != nil {
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
return
}

go func() {
log.Info("[WatchServiceStatus] service discovery start")
defer log.Error("[WatchServiceStatus] service discovery exited")
var (
latestStatus types.ServiceStatus
timer *time.Timer = time.NewTimer(h.config.ServiceDiscoveryPushInterval)
)
for {
select {
case addresses, ok := <-ch:
if !ok {
log.Error("[WatchServiceStatus] watch channel closed")
return
}

latestStatus = types.ServiceStatus{
Addresses: addresses,
Interval: h.config.ServiceDiscoveryPushInterval * 2,
}
h.dispatch(latestStatus)

case <-timer.C:
h.dispatch(latestStatus)
}
timer.Stop()
timer.Reset(h.config.ServiceDiscoveryPushInterval)
}
}()
}

func (h *Helium) dispatch(status types.ServiceStatus) {
h.subs.Range(func(k, v interface{}) bool {
c, ok := v.(chan<- types.ServiceStatus)
if !ok {
log.Error("[WatchServiceStatus] failed to cast channel from map")
return true
}
c <- status
return true
})
}
1 change: 0 additions & 1 deletion store/etcdv3/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error
ch := make(chan []string)
go func() {
defer close(ch)
log.Info("[ServiceStatusStream] start watching service status")
resp, err := m.Get(ctx, fmt.Sprintf(serviceStatusKey, ""), clientv3.WithPrefix())
if err != nil {
log.Errorf("[ServiceStatusStream] failed to get current services: %v", err)
Expand Down