Skip to content

Commit

Permalink
Async wf consumer manager should watch its enabled/disabled state ins…
Browse files Browse the repository at this point in the history
…tead of relying on restart (#5966)
  • Loading branch information
taylanisikdemir authored May 2, 2024
1 parent 7a7578a commit 0c1bdca
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 23 deletions.
92 changes: 73 additions & 19 deletions service/worker/asyncworkflow/async_workflow_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/cadence/common/asyncworkflow/queue/provider"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -58,6 +59,18 @@ func WithRefreshInterval(interval time.Duration) ConsumerManagerOptions {
}
}

func WithEnabledPropertyFn(enabledFn dynamicconfig.BoolPropertyFn) ConsumerManagerOptions {
return func(c *ConsumerManager) {
c.enabledFn = enabledFn
}
}

func WithEmitConsumerCountMetrifFn(fn func(int)) ConsumerManagerOptions {
return func(c *ConsumerManager) {
c.emitConsumerCountMetricFn = fn
}
}

func NewConsumerManager(
logger log.Logger,
metricsClient metrics.Client,
Expand All @@ -68,6 +81,7 @@ func NewConsumerManager(
) *ConsumerManager {
ctx, cancel := context.WithCancel(context.Background())
cm := &ConsumerManager{
enabledFn: dynamicconfig.GetBoolPropertyFn(true),
logger: logger.WithTags(tag.ComponentAsyncWFConsumptionManager),
metricsClient: metricsClient,
domainCache: domainCache,
Expand All @@ -81,25 +95,30 @@ func NewConsumerManager(
timeSrc: clock.NewRealTimeSource(),
}

cm.emitConsumerCountMetricFn = cm.emitConsumerCountMetric

for _, opt := range options {
opt(cm)
}
return cm
}

type ConsumerManager struct {
logger log.Logger
metricsClient metrics.Client
timeSrc clock.TimeSource
domainCache cache.DomainCache
queueProvider queue.Provider
frontendClient frontend.Client
refreshInterval time.Duration
shutdownTimeout time.Duration
ctx context.Context
cancelFn context.CancelFunc
wg sync.WaitGroup
activeConsumers map[string]provider.Consumer
// all member variables are accessed without any mutex with the assumption that they are only accessed by the background loop
enabledFn dynamicconfig.BoolPropertyFn
logger log.Logger
metricsClient metrics.Client
timeSrc clock.TimeSource
domainCache cache.DomainCache
queueProvider queue.Provider
frontendClient frontend.Client
refreshInterval time.Duration
shutdownTimeout time.Duration
ctx context.Context
cancelFn context.CancelFunc
wg sync.WaitGroup
activeConsumers map[string]provider.Consumer
emitConsumerCountMetricFn func(int)
}

func (c *ConsumerManager) Start() {
Expand All @@ -117,10 +136,7 @@ func (c *ConsumerManager) Stop() {
return
}

for qID, consumer := range c.activeConsumers {
consumer.Stop()
c.logger.Info("Stopped consumer", tag.AsyncWFQueueID(qID))
}
c.stopConsumers()

c.logger.Info("Stopped ConsumerManager")
}
Expand All @@ -132,12 +148,30 @@ func (c *ConsumerManager) run() {
defer ticker.Stop()
c.logger.Info("ConsumerManager background loop started", tag.Dynamic("refresh-interval", c.refreshInterval))

c.refreshConsumers()
enabled := c.enabledFn()
if enabled {
c.refreshConsumers()
} else {
c.logger.Info("ConsumerManager is disabled at the moment so skipping initial refresh")
}

for {
select {
case <-ticker.Chan():
c.refreshConsumers()
previouslyEnabled := enabled
enabled = c.enabledFn()
if enabled != previouslyEnabled {
c.logger.Info("ConsumerManager enabled state changed", tag.Dynamic("enabled", enabled))
}

if enabled {
// refresh consumers every round when consumer is enabled
c.refreshConsumers()
} else {
// stop consumers when consumer is disabled
c.stopConsumers()
}

case <-c.ctx.Done():
c.logger.Info("ConsumerManager background loop stopped because context is done")
return
Expand Down Expand Up @@ -218,7 +252,27 @@ func (c *ConsumerManager) refreshConsumers() {
}

c.logger.Info("Refreshed consumers", tag.Dynamic("consumer-count", len(c.activeConsumers)))
c.metricsClient.Scope(metrics.AsyncWorkflowConsumerScope).UpdateGauge(metrics.AsyncWorkflowConsumerCount, float64(len(c.activeConsumers)))
c.emitConsumerCountMetricFn(len(c.activeConsumers))
}

func (c *ConsumerManager) emitConsumerCountMetric(count int) {
c.metricsClient.Scope(metrics.AsyncWorkflowConsumerScope).UpdateGauge(metrics.AsyncWorkflowConsumerCount, float64(count))
}

func (c *ConsumerManager) stopConsumers() {
if len(c.activeConsumers) == 0 {
return
}

c.logger.Info("Stopping all active consumers", tag.Dynamic("consumer-count", len(c.activeConsumers)))
for qID, consumer := range c.activeConsumers {
consumer.Stop()
c.logger.Info("Stopped consumer", tag.AsyncWFQueueID(qID))
delete(c.activeConsumers, qID)
}

c.emitConsumerCountMetricFn(len(c.activeConsumers))
c.logger.Info("Stopped all active consumers", tag.Dynamic("consumer-count", len(c.activeConsumers)))
}

func (c *ConsumerManager) getQueue(cfg types.AsyncWorkflowConfiguration) (provider.Queue, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
"fmt"
"sort"
"sync/atomic"
"testing"
"time"

Expand All @@ -36,6 +37,7 @@ import (
"github.com/uber/cadence/common/asyncworkflow/queue/provider"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -319,6 +321,84 @@ func TestConsumerManager(t *testing.T) {
}
}

func TestConsumerManagerEnabledDisabled(t *testing.T) {
ctrl := gomock.NewController(t)
mockTimeSrc := clock.NewMockedTimeSource()
mockDomainCache := cache.NewMockDomainCache(ctrl)
mockQueueProvider := queue.NewMockProvider(ctrl)
dwc := domainWithConfig{
name: "domain1",
asyncWFCfg: types.AsyncWorkflowConfiguration{
Enabled: true,
QueueType: "kafka",
QueueConfig: &types.DataBlob{
EncodingType: types.EncodingTypeJSON.Ptr(),
Data: []byte(`{"brokers":["localhost:9092"],"topics":["test-topic"]}`),
},
},
}

mockDomainCache.EXPECT().GetAllDomain().Return(toDomainCacheEntries([]domainWithConfig{dwc})).AnyTimes()
queueMock := provider.NewMockQueue(ctrl)
queueMock.EXPECT().ID().Return(queueID(dwc.asyncWFCfg)).AnyTimes()
mockQueueProvider.EXPECT().GetQueue(gomock.Any(), gomock.Any()).Return(queueMock, nil).AnyTimes()
mockConsumer := provider.NewMockConsumer(ctrl)
mockConsumer.EXPECT().Start().Return(nil).AnyTimes()
mockConsumer.EXPECT().Stop().AnyTimes()
queueMock.EXPECT().CreateConsumer(gomock.Any()).Return(mockConsumer, nil).AnyTimes()

var consumerMgrEnabled, consumerCount int32

// create consumer manager
cm := NewConsumerManager(
testlogger.New(t),
metrics.NewNoopMetricsClient(),
mockDomainCache,
mockQueueProvider,
nil,
WithTimeSource(mockTimeSrc),
WithEnabledPropertyFn(func(opts ...dynamicconfig.FilterOption) bool {
return atomic.LoadInt32(&consumerMgrEnabled) == 1
}),
WithEmitConsumerCountMetrifFn(func(count int) {
atomic.StoreInt32(&consumerCount, int32(count))
}),
)

cm.Start()
defer cm.Stop()

// wait for the first round of consumers to be created and verify consumer count
atomic.StoreInt32(&consumerMgrEnabled, 1)
time.Sleep(50 * time.Millisecond)
t.Log("first round comparison")
got := atomic.LoadInt32(&consumerCount)
want := 1 // consumer manager is enabled
if got != int32(want) {
t.Fatalf("Consumer count mismatch after first round, want: %v, got: %v", want, got)
}

// disable consumer manager and wait for the second round of refresh
atomic.StoreInt32(&consumerMgrEnabled, 0)
mockTimeSrc.Advance(defaultRefreshInterval)
time.Sleep(50 * time.Millisecond)
got = atomic.LoadInt32(&consumerCount)
want = 0 // all consumers should be stopped when consumer manager is disabled
if got != int32(want) {
t.Fatalf("Consumer count mismatch after second round, want: %v, got: %v", want, got)
}

// enable consumer manager and wait for the third round of refresh
atomic.StoreInt32(&consumerMgrEnabled, 1)
mockTimeSrc.Advance(defaultRefreshInterval)
time.Sleep(50 * time.Millisecond)
got = atomic.LoadInt32(&consumerCount)
want = 1 // consumer manager is enabled
if got != int32(want) {
t.Fatalf("Consumer count mismatch after third round, want: %v, got: %v", want, got)
}
}

func toDomainCacheEntries(domains []domainWithConfig) map[string]*cache.DomainCacheEntry {
result := make(map[string]*cache.DomainCacheEntry, len(domains))
for _, d := range domains {
Expand Down
7 changes: 3 additions & 4 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,8 @@ func (s *Service) Start() {
s.startFailoverManager()
}

if s.config.EnableAsyncWorkflowConsumption() {
cm := s.startAsyncWorkflowConsumerManager()
defer cm.Stop()
}
cm := s.startAsyncWorkflowConsumerManager()
defer cm.Stop()

logger.Info("worker started", tag.ComponentWorker)
<-s.stopC
Expand Down Expand Up @@ -407,6 +405,7 @@ func (s *Service) startAsyncWorkflowConsumerManager() common.Daemon {
s.GetDomainCache(),
s.Resource.GetAsyncWorkflowQueueProvider(),
s.GetFrontendClient(),
asyncworkflow.WithEnabledPropertyFn(s.config.EnableAsyncWorkflowConsumption),
)
cm.Start()
return cm
Expand Down

0 comments on commit 0c1bdca

Please sign in to comment.