diff --git a/service/worker/asyncworkflow/async_workflow_consumer_manager.go b/service/worker/asyncworkflow/async_workflow_consumer_manager.go index 1a0379c30ad..52f91828ee2 100644 --- a/service/worker/asyncworkflow/async_workflow_consumer_manager.go +++ b/service/worker/asyncworkflow/async_workflow_consumer_manager.go @@ -33,6 +33,7 @@ import ( "github.com/uber/cadence/common/asyncworkflow/queue/provider" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -58,6 +59,18 @@ func WithRefreshInterval(interval time.Duration) ConsumerManagerOptions { } } +func WithEnabledPropertyFn(enabledFn dynamicconfig.BoolPropertyFn) ConsumerManagerOptions { + return func(c *ConsumerManager) { + c.enabledFn = enabledFn + } +} + +func WithEmitConsumerCountMetrifFn(fn func(int)) ConsumerManagerOptions { + return func(c *ConsumerManager) { + c.emitConsumerCountMetricFn = fn + } +} + func NewConsumerManager( logger log.Logger, metricsClient metrics.Client, @@ -68,6 +81,7 @@ func NewConsumerManager( ) *ConsumerManager { ctx, cancel := context.WithCancel(context.Background()) cm := &ConsumerManager{ + enabledFn: dynamicconfig.GetBoolPropertyFn(true), logger: logger.WithTags(tag.ComponentAsyncWFConsumptionManager), metricsClient: metricsClient, domainCache: domainCache, @@ -81,6 +95,8 @@ func NewConsumerManager( timeSrc: clock.NewRealTimeSource(), } + cm.emitConsumerCountMetricFn = cm.emitConsumerCountMetric + for _, opt := range options { opt(cm) } @@ -88,18 +104,21 @@ func NewConsumerManager( } type ConsumerManager struct { - logger log.Logger - metricsClient metrics.Client - timeSrc clock.TimeSource - domainCache cache.DomainCache - queueProvider queue.Provider - frontendClient frontend.Client - refreshInterval time.Duration - shutdownTimeout time.Duration - ctx context.Context - cancelFn context.CancelFunc - wg sync.WaitGroup - activeConsumers map[string]provider.Consumer + // all member variables are accessed without any mutex with the assumption that they are only accessed by the background loop + enabledFn dynamicconfig.BoolPropertyFn + logger log.Logger + metricsClient metrics.Client + timeSrc clock.TimeSource + domainCache cache.DomainCache + queueProvider queue.Provider + frontendClient frontend.Client + refreshInterval time.Duration + shutdownTimeout time.Duration + ctx context.Context + cancelFn context.CancelFunc + wg sync.WaitGroup + activeConsumers map[string]provider.Consumer + emitConsumerCountMetricFn func(int) } func (c *ConsumerManager) Start() { @@ -117,10 +136,7 @@ func (c *ConsumerManager) Stop() { return } - for qID, consumer := range c.activeConsumers { - consumer.Stop() - c.logger.Info("Stopped consumer", tag.AsyncWFQueueID(qID)) - } + c.stopConsumers() c.logger.Info("Stopped ConsumerManager") } @@ -132,12 +148,30 @@ func (c *ConsumerManager) run() { defer ticker.Stop() c.logger.Info("ConsumerManager background loop started", tag.Dynamic("refresh-interval", c.refreshInterval)) - c.refreshConsumers() + enabled := c.enabledFn() + if enabled { + c.refreshConsumers() + } else { + c.logger.Info("ConsumerManager is disabled at the moment so skipping initial refresh") + } for { select { case <-ticker.Chan(): - c.refreshConsumers() + previouslyEnabled := enabled + enabled = c.enabledFn() + if enabled != previouslyEnabled { + c.logger.Info("ConsumerManager enabled state changed", tag.Dynamic("enabled", enabled)) + } + + if enabled { + // refresh consumers every round when consumer is enabled + c.refreshConsumers() + } else { + // stop consumers when consumer is disabled + c.stopConsumers() + } + case <-c.ctx.Done(): c.logger.Info("ConsumerManager background loop stopped because context is done") return @@ -218,7 +252,27 @@ func (c *ConsumerManager) refreshConsumers() { } c.logger.Info("Refreshed consumers", tag.Dynamic("consumer-count", len(c.activeConsumers))) - c.metricsClient.Scope(metrics.AsyncWorkflowConsumerScope).UpdateGauge(metrics.AsyncWorkflowConsumerCount, float64(len(c.activeConsumers))) + c.emitConsumerCountMetricFn(len(c.activeConsumers)) +} + +func (c *ConsumerManager) emitConsumerCountMetric(count int) { + c.metricsClient.Scope(metrics.AsyncWorkflowConsumerScope).UpdateGauge(metrics.AsyncWorkflowConsumerCount, float64(count)) +} + +func (c *ConsumerManager) stopConsumers() { + if len(c.activeConsumers) == 0 { + return + } + + c.logger.Info("Stopping all active consumers", tag.Dynamic("consumer-count", len(c.activeConsumers))) + for qID, consumer := range c.activeConsumers { + consumer.Stop() + c.logger.Info("Stopped consumer", tag.AsyncWFQueueID(qID)) + delete(c.activeConsumers, qID) + } + + c.emitConsumerCountMetricFn(len(c.activeConsumers)) + c.logger.Info("Stopped all active consumers", tag.Dynamic("consumer-count", len(c.activeConsumers))) } func (c *ConsumerManager) getQueue(cfg types.AsyncWorkflowConfiguration) (provider.Queue, error) { diff --git a/service/worker/asyncworkflow/async_workflow_consumer_manager_test.go b/service/worker/asyncworkflow/async_workflow_consumer_manager_test.go index 973fcba3e57..bac72d4651e 100644 --- a/service/worker/asyncworkflow/async_workflow_consumer_manager_test.go +++ b/service/worker/asyncworkflow/async_workflow_consumer_manager_test.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "sort" + "sync/atomic" "testing" "time" @@ -36,6 +37,7 @@ import ( "github.com/uber/cadence/common/asyncworkflow/queue/provider" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" @@ -319,6 +321,84 @@ func TestConsumerManager(t *testing.T) { } } +func TestConsumerManagerEnabledDisabled(t *testing.T) { + ctrl := gomock.NewController(t) + mockTimeSrc := clock.NewMockedTimeSource() + mockDomainCache := cache.NewMockDomainCache(ctrl) + mockQueueProvider := queue.NewMockProvider(ctrl) + dwc := domainWithConfig{ + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + QueueType: "kafka", + QueueConfig: &types.DataBlob{ + EncodingType: types.EncodingTypeJSON.Ptr(), + Data: []byte(`{"brokers":["localhost:9092"],"topics":["test-topic"]}`), + }, + }, + } + + mockDomainCache.EXPECT().GetAllDomain().Return(toDomainCacheEntries([]domainWithConfig{dwc})).AnyTimes() + queueMock := provider.NewMockQueue(ctrl) + queueMock.EXPECT().ID().Return(queueID(dwc.asyncWFCfg)).AnyTimes() + mockQueueProvider.EXPECT().GetQueue(gomock.Any(), gomock.Any()).Return(queueMock, nil).AnyTimes() + mockConsumer := provider.NewMockConsumer(ctrl) + mockConsumer.EXPECT().Start().Return(nil).AnyTimes() + mockConsumer.EXPECT().Stop().AnyTimes() + queueMock.EXPECT().CreateConsumer(gomock.Any()).Return(mockConsumer, nil).AnyTimes() + + var consumerMgrEnabled, consumerCount int32 + + // create consumer manager + cm := NewConsumerManager( + testlogger.New(t), + metrics.NewNoopMetricsClient(), + mockDomainCache, + mockQueueProvider, + nil, + WithTimeSource(mockTimeSrc), + WithEnabledPropertyFn(func(opts ...dynamicconfig.FilterOption) bool { + return atomic.LoadInt32(&consumerMgrEnabled) == 1 + }), + WithEmitConsumerCountMetrifFn(func(count int) { + atomic.StoreInt32(&consumerCount, int32(count)) + }), + ) + + cm.Start() + defer cm.Stop() + + // wait for the first round of consumers to be created and verify consumer count + atomic.StoreInt32(&consumerMgrEnabled, 1) + time.Sleep(50 * time.Millisecond) + t.Log("first round comparison") + got := atomic.LoadInt32(&consumerCount) + want := 1 // consumer manager is enabled + if got != int32(want) { + t.Fatalf("Consumer count mismatch after first round, want: %v, got: %v", want, got) + } + + // disable consumer manager and wait for the second round of refresh + atomic.StoreInt32(&consumerMgrEnabled, 0) + mockTimeSrc.Advance(defaultRefreshInterval) + time.Sleep(50 * time.Millisecond) + got = atomic.LoadInt32(&consumerCount) + want = 0 // all consumers should be stopped when consumer manager is disabled + if got != int32(want) { + t.Fatalf("Consumer count mismatch after second round, want: %v, got: %v", want, got) + } + + // enable consumer manager and wait for the third round of refresh + atomic.StoreInt32(&consumerMgrEnabled, 1) + mockTimeSrc.Advance(defaultRefreshInterval) + time.Sleep(50 * time.Millisecond) + got = atomic.LoadInt32(&consumerCount) + want = 1 // consumer manager is enabled + if got != int32(want) { + t.Fatalf("Consumer count mismatch after third round, want: %v, got: %v", want, got) + } +} + func toDomainCacheEntries(domains []domainWithConfig) map[string]*cache.DomainCacheEntry { result := make(map[string]*cache.DomainCacheEntry, len(domains)) for _, d := range domains { diff --git a/service/worker/service.go b/service/worker/service.go index 54d5b3a30aa..7f8afc4dc10 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -236,10 +236,8 @@ func (s *Service) Start() { s.startFailoverManager() } - if s.config.EnableAsyncWorkflowConsumption() { - cm := s.startAsyncWorkflowConsumerManager() - defer cm.Stop() - } + cm := s.startAsyncWorkflowConsumerManager() + defer cm.Stop() logger.Info("worker started", tag.ComponentWorker) <-s.stopC @@ -407,6 +405,7 @@ func (s *Service) startAsyncWorkflowConsumerManager() common.Daemon { s.GetDomainCache(), s.Resource.GetAsyncWorkflowQueueProvider(), s.GetFrontendClient(), + asyncworkflow.WithEnabledPropertyFn(s.config.EnableAsyncWorkflowConsumption), ) cm.Start() return cm