From 50cb9298ed80ac393a8dd71fb74f20fa8ab18484 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Fri, 17 Mar 2023 15:00:38 -0700 Subject: [PATCH 01/29] large workflow hot shard detection --- common/dynamicconfig/constants.go | 31 ++++++++++++++++++++++- common/metrics/defs.go | 17 ++++++++++++- service/history/config/config.go | 15 ++++++++--- service/history/execution/context.go | 5 ++-- service/history/execution/context_util.go | 27 ++++++++++++++++++++ 5 files changed, 87 insertions(+), 8 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index d4a46bbc209..02bc6b3e109 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1346,7 +1346,21 @@ const ( // Value type: Int // Default value: 100 SampleLoggingRate - + // LargeShardHistorySizeMetricThreshold defines the threshold for what consititutes a large history storage size to alert on + // KeyName: system.largeShardHistorySizeMetricThreshold + // Value type: Int + // Default value: 10485760 + LargeShardHistorySizeMetricThreshold + // LargeShardHistoryEventMetricThreshold defines the threshold for what consititutes a large history event size to alert on + // KeyName: system.largeShardHistoryEventMetricThreshold + // Value type: Int + // Default value: 10000 + LargeShardHistoryEventMetricThreshold + // LargeShardHistoryBlobMetricThreshold defines the threshold for what consititutes a large history blob size to alert on + // KeyName: system.largeShardHistoryBlobMetricThreshold + // Value type: Int + // Default value: 262144 + LargeShardHistoryBlobMetricThreshold // LastIntKey must be the last one in this const group LastIntKey ) @@ -3454,6 +3468,21 @@ var IntKeys = map[IntKey]DynamicInt{ Description: "The rate for which sampled logs are logged at. 100 means 1/100 is logged", DefaultValue: 100, }, + LargeShardHistorySizeMetricThreshold: DynamicInt{ + KeyName: "system.largeShardHistorySizeMetricThreshold", + Description: "defines the threshold for what consititutes a large history size to alert on, default is 10mb", + DefaultValue: 10485760, + }, + LargeShardHistoryEventMetricThreshold: DynamicInt{ + KeyName: "system.largeShardHistoryEventMetricThreshold", + Description: "defines the threshold for what consititutes a large history event length to alert on, default is 10000", + DefaultValue: 10000, + }, + LargeShardHistoryBlobMetricThreshold: DynamicInt{ + KeyName: "system.largeShardHistoryEventMetricThreshold", + Description: "defines the threshold for what consititutes a large history blob write to alert on, default is 1/4mb", + DefaultValue: 262144, + }, } var BoolKeys = map[BoolKey]DynamicBool{ diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 99de5345ad5..ef4104ef0d8 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1157,7 +1157,12 @@ const ( HistoryReplicationV2TaskScope // SyncActivityTaskScope is the scope used by sync activity information processing SyncActivityTaskScope - + // LargeExecutionSizeShardScope is the scope to track large history size for hotshard detection + LargeExecutionSizeShardScope + // LargeExecutionCountShardScope is the scope to track large history count for hotshard detection + LargeExecutionCountShardScope + // LargeExecutionBlobShardScope is the scope to track large blobs for hotshard detection + LargeExecutionBlobShardScope NumHistoryScopes ) @@ -1750,6 +1755,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FailoverMarkerScope: {operation: "FailoverMarker"}, HistoryReplicationV2TaskScope: {operation: "HistoryReplicationV2Task"}, SyncActivityTaskScope: {operation: "SyncActivityTask"}, + LargeExecutionSizeShardScope: {operation: "LargeExecutionSizeShard"}, + LargeExecutionCountShardScope: {operation: "LargeExecutionCountShard"}, + LargeExecutionBlobShardScope: {operation: "LargeExecutionBlobShard"}, }, // Matching Scope Names Matching: { @@ -1978,6 +1986,10 @@ const ( ParentClosePolicyProcessorSuccess ParentClosePolicyProcessorFailures + LargeHistoryBlobCount + LargeHistoryEventCount + LargeHistorySizeCount + NumCommonMetrics // Needs to be last on this list for iota numbering ) @@ -2844,6 +2856,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ReplicationTasksCount: {metricName: "replication_tasks_count", metricType: Timer}, WorkflowVersionCount: {metricName: "workflow_version_count", metricType: Gauge}, WorkflowTypeCount: {metricName: "workflow_type_count", metricType: Gauge}, + LargeHistoryBlobCount: {metricName: "large_history_blob_count", metricType: Counter}, + LargeHistoryEventCount: {metricName: "large_history_event_count", metricType: Counter}, + LargeHistorySizeCount: {metricName: "large_history_size_count", metricType: Counter}, }, Matching: { PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"}, diff --git a/service/history/config/config.go b/service/history/config/config.go index 9fc2e77f909..41b38c16c91 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -317,8 +317,12 @@ type Config struct { EnableDebugMode bool // note that this value is initialized once on service start EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter - SampleLoggingRate dynamicconfig.IntPropertyFn - EnableShardIDMetrics dynamicconfig.BoolPropertyFn + // Hotshard stuff + SampleLoggingRate dynamicconfig.IntPropertyFn + EnableShardIDMetrics dynamicconfig.BoolPropertyFn + LargeShardHistorySizeMetricThreshold dynamicconfig.IntPropertyFn + LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn + LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn } // New returns new service config with default values @@ -556,8 +560,11 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode)(), EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.HistoryEnableTaskInfoLogByDomainID), - SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate), - EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics), + SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate), + EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics), + LargeShardHistorySizeMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistorySizeMetricThreshold), + LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold), + LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold), } return cfg diff --git a/service/history/execution/context.go b/service/history/execution/context.go index e8ecac3bde1..1edfa5cc0b1 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -706,7 +706,6 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( currentWorkflowTransactionPolicy TransactionPolicy, newWorkflowTransactionPolicy *TransactionPolicy, ) (retError error) { - defer func() { if retError != nil { c.Clear() @@ -720,9 +719,9 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( if err != nil { return err } - var persistedBlobs events.PersistedBlobs currentWorkflowSize := c.GetHistorySize() + currentWorkflowHistorySize := c.mutableState.GetNextEventID() - 1 for _, workflowEvents := range currentWorkflowEventsSeq { blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) if err != nil { @@ -852,6 +851,8 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( domainName, resp.MutableStateUpdateSessionStats, ) + emitLargeWorkflowShardIDStats(c.logger, c.metricsClient, c.shard.GetConfig(), string(c.shard.GetShardID()), domainName, c.workflowExecution.WorkflowID, + c.stats.HistorySize, c.mutableState.GetNextEventID()-1, currentWorkflowSize, currentWorkflowHistorySize, c.stats.HistorySize-currentWorkflowSize) // emit workflow completion stats if any if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil { diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 8ec94696eef..dda224389ad 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -21,6 +21,8 @@ package execution import ( + "github.com/uber/cadence/service/history/config" + "strconv" "time" "github.com/uber/cadence/common/log" @@ -30,6 +32,31 @@ import ( "github.com/uber/cadence/common/types" ) +func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Client, config *config.Config, shardID string, + domainName string, workflowID string, newHistorySize int64, newHistoryCount int64, oldHistorySize int64, oldHistoryCount int64, blobSize int64) { + if config.EnableShardIDMetrics() { + shardIDInt, _ := strconv.Atoi(shardID) + // check if blob size is larger than threshold in Dynamic config if so alert on it every time + if blobSize > int64(config.LargeShardHistoryBlobMetricThreshold()) { + logger.SampleInfo("Workflow writing a large blob", config.SampleLoggingRate(), tag.WorkflowDomainName(domainName), + tag.WorkflowRunID(workflowID), tag.ShardID(shardIDInt)) + metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardID)).IncCounter(metrics.LargeHistoryBlobCount) + } + // check if the new history count is greater than our threshold and only count/log it once when it passes it + if oldHistoryCount < int64(config.LargeShardHistoryEventMetricThreshold()) && newHistoryCount >= int64(config.LargeShardHistoryEventMetricThreshold()) { + logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(domainName), + tag.WorkflowRunID(workflowID), tag.ShardID(shardIDInt)) + metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardID)).IncCounter(metrics.LargeHistoryEventCount) + } + // check if the new history size is greater than our threshold and only count/log it once when it passes it + if oldHistoryCount < int64(config.LargeShardHistorySizeMetricThreshold()) && newHistorySize >= int64(config.LargeShardHistorySizeMetricThreshold()) { + logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(domainName), + tag.WorkflowRunID(workflowID), tag.ShardID(shardIDInt)) + metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardID)).IncCounter(metrics.LargeHistorySizeCount) + } + } +} + func emitWorkflowHistoryStats( metricsClient metrics.Client, domainName string, From d4c8cd4b9733c61a6cb09a2acd86747466de0b30 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Fri, 17 Mar 2023 15:21:11 -0700 Subject: [PATCH 02/29] fix lint --- service/history/execution/context_util.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index dda224389ad..4a01425bddc 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -21,10 +21,11 @@ package execution import ( - "github.com/uber/cadence/service/history/config" "strconv" "time" + "github.com/uber/cadence/service/history/config" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" From e38bb2a995f3ba8d9802ced4a5bf33dea77cced8 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Fri, 17 Mar 2023 15:28:18 -0700 Subject: [PATCH 03/29] fix bad keyname --- common/dynamicconfig/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 02bc6b3e109..fc203da6273 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -3479,7 +3479,7 @@ var IntKeys = map[IntKey]DynamicInt{ DefaultValue: 10000, }, LargeShardHistoryBlobMetricThreshold: DynamicInt{ - KeyName: "system.largeShardHistoryEventMetricThreshold", + KeyName: "system.largeShardHistoryBlobMetricThreshold", Description: "defines the threshold for what consititutes a large history blob write to alert on, default is 1/4mb", DefaultValue: 262144, }, From 7b021c603d70d2b4f2931286d3f37627259082a0 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Fri, 17 Mar 2023 15:48:07 -0700 Subject: [PATCH 04/29] fix test --- common/metrics/defs.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index ef4104ef0d8..b92c9b6b0e2 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1163,6 +1163,7 @@ const ( LargeExecutionCountShardScope // LargeExecutionBlobShardScope is the scope to track large blobs for hotshard detection LargeExecutionBlobShardScope + NumHistoryScopes ) @@ -1986,10 +1987,6 @@ const ( ParentClosePolicyProcessorSuccess ParentClosePolicyProcessorFailures - LargeHistoryBlobCount - LargeHistoryEventCount - LargeHistorySizeCount - NumCommonMetrics // Needs to be last on this list for iota numbering ) @@ -2259,6 +2256,9 @@ const ( HistoryFailoverCallbackCount WorkflowVersionCount WorkflowTypeCount + LargeHistoryBlobCount + LargeHistoryEventCount + LargeHistorySizeCount NumHistoryMetrics ) From f0b5adae1d291531170a0f0809a60d7ee9575370 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Fri, 17 Mar 2023 16:17:07 -0700 Subject: [PATCH 05/29] fix test --- service/history/execution/context.go | 2 +- service/history/execution/context_util.go | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 1edfa5cc0b1..736573a9be8 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -851,7 +851,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( domainName, resp.MutableStateUpdateSessionStats, ) - emitLargeWorkflowShardIDStats(c.logger, c.metricsClient, c.shard.GetConfig(), string(c.shard.GetShardID()), domainName, c.workflowExecution.WorkflowID, + emitLargeWorkflowShardIDStats(c.logger, c.metricsClient, c.shard.GetConfig(), c.shard.GetShardID(), domainName, c.workflowExecution.WorkflowID, c.stats.HistorySize, c.mutableState.GetNextEventID()-1, currentWorkflowSize, currentWorkflowHistorySize, c.stats.HistorySize-currentWorkflowSize) // emit workflow completion stats if any if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 4a01425bddc..5ccef48571b 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -33,27 +33,27 @@ import ( "github.com/uber/cadence/common/types" ) -func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Client, config *config.Config, shardID string, +func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Client, config *config.Config, shardID int, domainName string, workflowID string, newHistorySize int64, newHistoryCount int64, oldHistorySize int64, oldHistoryCount int64, blobSize int64) { if config.EnableShardIDMetrics() { - shardIDInt, _ := strconv.Atoi(shardID) + shardIDStr := strconv.Itoa(shardID) // check if blob size is larger than threshold in Dynamic config if so alert on it every time if blobSize > int64(config.LargeShardHistoryBlobMetricThreshold()) { logger.SampleInfo("Workflow writing a large blob", config.SampleLoggingRate(), tag.WorkflowDomainName(domainName), - tag.WorkflowRunID(workflowID), tag.ShardID(shardIDInt)) - metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardID)).IncCounter(metrics.LargeHistoryBlobCount) + tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) + metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } // check if the new history count is greater than our threshold and only count/log it once when it passes it if oldHistoryCount < int64(config.LargeShardHistoryEventMetricThreshold()) && newHistoryCount >= int64(config.LargeShardHistoryEventMetricThreshold()) { logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(domainName), - tag.WorkflowRunID(workflowID), tag.ShardID(shardIDInt)) - metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardID)).IncCounter(metrics.LargeHistoryEventCount) + tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) + metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) } // check if the new history size is greater than our threshold and only count/log it once when it passes it - if oldHistoryCount < int64(config.LargeShardHistorySizeMetricThreshold()) && newHistorySize >= int64(config.LargeShardHistorySizeMetricThreshold()) { + if oldHistorySize < int64(config.LargeShardHistorySizeMetricThreshold()) && newHistorySize >= int64(config.LargeShardHistorySizeMetricThreshold()) { logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(domainName), - tag.WorkflowRunID(workflowID), tag.ShardID(shardIDInt)) - metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardID)).IncCounter(metrics.LargeHistorySizeCount) + tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) + metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) } } } From 55fccb50f2ebad93723957854f67f5458e6399be Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Mon, 20 Mar 2023 15:32:59 -0700 Subject: [PATCH 06/29] turn off to test --- service/history/execution/context_util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 5ccef48571b..ad2a2a4f9ec 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -39,7 +39,7 @@ func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Clie shardIDStr := strconv.Itoa(shardID) // check if blob size is larger than threshold in Dynamic config if so alert on it every time if blobSize > int64(config.LargeShardHistoryBlobMetricThreshold()) { - logger.SampleInfo("Workflow writing a large blob", config.SampleLoggingRate(), tag.WorkflowDomainName(domainName), + logger.SampleInfo("Workflow writing a large blob", 1, tag.WorkflowDomainName(domainName), tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } From 1c684fbd15fe934f29a1832459b571536d1a4cf8 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Mon, 20 Mar 2023 18:10:20 -0700 Subject: [PATCH 07/29] change to overall shard --- common/persistence/persistenceMetricClients.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 8997411759e..c37b80fa37e 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -315,7 +315,7 @@ func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op duration := time.Since(before) domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration) - shardOperationsMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration) + shardOverallMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration) if p.enableLatencyHistogramMetrics { domainMetricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration) From 737d830b7756bee5009be23d2b6f2cf36967e8b7 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Mon, 20 Mar 2023 18:21:24 -0700 Subject: [PATCH 08/29] add operation too --- common/persistence/persistenceMetricClients.go | 1 + 1 file changed, 1 insertion(+) diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index c37b80fa37e..b553931f801 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -315,6 +315,7 @@ func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op duration := time.Since(before) domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration) + shardOperationsMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration) shardOverallMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration) if p.enableLatencyHistogramMetrics { From 7cb379e2c2ebd609cfc34894122bdc8e97d6ad5e Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 21 Mar 2023 14:24:46 -0700 Subject: [PATCH 09/29] test --- service/history/execution/context_util.go | 1 + 1 file changed, 1 insertion(+) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index ad2a2a4f9ec..410ca0f1b14 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -35,6 +35,7 @@ import ( func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Client, config *config.Config, shardID int, domainName string, workflowID string, newHistorySize int64, newHistoryCount int64, oldHistorySize int64, oldHistoryCount int64, blobSize int64) { + logger.Info("WE CAN SEE THIS") if config.EnableShardIDMetrics() { shardIDStr := strconv.Itoa(shardID) // check if blob size is larger than threshold in Dynamic config if so alert on it every time From d72cff01e15bc4ee769fc32ae686fd47ba6f950e Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 21 Mar 2023 14:38:37 -0700 Subject: [PATCH 10/29] more logs --- service/history/execution/context_util.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 410ca0f1b14..6aa6657572b 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -35,7 +35,10 @@ import ( func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Client, config *config.Config, shardID int, domainName string, workflowID string, newHistorySize int64, newHistoryCount int64, oldHistorySize int64, oldHistoryCount int64, blobSize int64) { - logger.Info("WE CAN SEE THIS") + if domainName == "cadence-killers-staging" { + logger.Info("INFORMATION DUMP", tag.Dynamic("newHistorySize", newHistorySize), tag.Dynamic("newHistoryCount", newHistoryCount), + tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) + } if config.EnableShardIDMetrics() { shardIDStr := strconv.Itoa(shardID) // check if blob size is larger than threshold in Dynamic config if so alert on it every time From 2399e313cc4647313e6d15d364087f4c5ade44fd Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 21 Mar 2023 15:13:48 -0700 Subject: [PATCH 11/29] try again and refactor --- service/history/execution/context.go | 7 ++++--- service/history/execution/context_util.go | 8 ++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 736573a9be8..0c5fac9f86c 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -720,10 +720,11 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( return err } var persistedBlobs events.PersistedBlobs - currentWorkflowSize := c.GetHistorySize() - currentWorkflowHistorySize := c.mutableState.GetNextEventID() - 1 + currentWorkflowSize, oldWorkflowSize := c.GetHistorySize(), c.GetHistorySize() + currentWorkflowHistoryCount, oldWorkflowHistoryCount := c.mutableState.GetNextEventID()-1, c.mutableState.GetNextEventID()-1 for _, workflowEvents := range currentWorkflowEventsSeq { blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) + oldWorkflowHistorySize += int64(len(workflowEvents.Events)) if err != nil { return err } @@ -852,7 +853,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( resp.MutableStateUpdateSessionStats, ) emitLargeWorkflowShardIDStats(c.logger, c.metricsClient, c.shard.GetConfig(), c.shard.GetShardID(), domainName, c.workflowExecution.WorkflowID, - c.stats.HistorySize, c.mutableState.GetNextEventID()-1, currentWorkflowSize, currentWorkflowHistorySize, c.stats.HistorySize-currentWorkflowSize) + currentWorkflowSize, currentWorkflowHistoryCount, oldWorkflowSize, oldWorkflowHistoryCount, currentWorkflowHistoryCount-oldWorkflowHistoryCount) // emit workflow completion stats if any if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil { diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 6aa6657572b..bb916e91db6 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -34,9 +34,9 @@ import ( ) func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Client, config *config.Config, shardID int, - domainName string, workflowID string, newHistorySize int64, newHistoryCount int64, oldHistorySize int64, oldHistoryCount int64, blobSize int64) { + domainName string, workflowID string, currentHistorySize int64, currentHistoryCount int64, oldHistorySize int64, oldHistoryCount int64, blobSize int64) { if domainName == "cadence-killers-staging" { - logger.Info("INFORMATION DUMP", tag.Dynamic("newHistorySize", newHistorySize), tag.Dynamic("newHistoryCount", newHistoryCount), + logger.Info("INFORMATION DUMP", tag.Dynamic("currentHistorySize", currentHistorySize), tag.Dynamic("currentHistoryCount", currentHistoryCount), tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) } if config.EnableShardIDMetrics() { @@ -48,13 +48,13 @@ func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Clie metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } // check if the new history count is greater than our threshold and only count/log it once when it passes it - if oldHistoryCount < int64(config.LargeShardHistoryEventMetricThreshold()) && newHistoryCount >= int64(config.LargeShardHistoryEventMetricThreshold()) { + if oldHistoryCount < int64(config.LargeShardHistoryEventMetricThreshold()) && currentHistoryCount >= int64(config.LargeShardHistoryEventMetricThreshold()) { logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(domainName), tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) } // check if the new history size is greater than our threshold and only count/log it once when it passes it - if oldHistorySize < int64(config.LargeShardHistorySizeMetricThreshold()) && newHistorySize >= int64(config.LargeShardHistorySizeMetricThreshold()) { + if oldHistorySize < int64(config.LargeShardHistorySizeMetricThreshold()) && currentHistorySize >= int64(config.LargeShardHistorySizeMetricThreshold()) { logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(domainName), tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) From 63e1fbf607b3a35e0d929e79753006c9bf740d46 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 21 Mar 2023 15:36:24 -0700 Subject: [PATCH 12/29] fix ssome shit --- service/history/execution/context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 0c5fac9f86c..5ec3ea11ff3 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -724,7 +724,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( currentWorkflowHistoryCount, oldWorkflowHistoryCount := c.mutableState.GetNextEventID()-1, c.mutableState.GetNextEventID()-1 for _, workflowEvents := range currentWorkflowEventsSeq { blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) - oldWorkflowHistorySize += int64(len(workflowEvents.Events)) + currentWorkflowHistoryCount += int64(len(workflowEvents.Events)) if err != nil { return err } From 94df73ed9259f3f6f3a08916ca2aa56c05f560be Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 21 Mar 2023 16:04:54 -0700 Subject: [PATCH 13/29] fix blob size --- service/history/execution/context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 5ec3ea11ff3..bddd62a03b1 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -853,7 +853,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( resp.MutableStateUpdateSessionStats, ) emitLargeWorkflowShardIDStats(c.logger, c.metricsClient, c.shard.GetConfig(), c.shard.GetShardID(), domainName, c.workflowExecution.WorkflowID, - currentWorkflowSize, currentWorkflowHistoryCount, oldWorkflowSize, oldWorkflowHistoryCount, currentWorkflowHistoryCount-oldWorkflowHistoryCount) + currentWorkflowSize, currentWorkflowHistoryCount, oldWorkflowSize, oldWorkflowHistoryCount, currentWorkflowSize-oldWorkflowSize) // emit workflow completion stats if any if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil { From 316ace6d2bd849ba48d090976c049b71a23773d8 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 21 Mar 2023 16:22:22 -0700 Subject: [PATCH 14/29] test more --- service/history/execution/context_util.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index bb916e91db6..1e2c479d41b 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -35,28 +35,36 @@ import ( func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Client, config *config.Config, shardID int, domainName string, workflowID string, currentHistorySize int64, currentHistoryCount int64, oldHistorySize int64, oldHistoryCount int64, blobSize int64) { - if domainName == "cadence-killers-staging" { - logger.Info("INFORMATION DUMP", tag.Dynamic("currentHistorySize", currentHistorySize), tag.Dynamic("currentHistoryCount", currentHistoryCount), - tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) - } if config.EnableShardIDMetrics() { shardIDStr := strconv.Itoa(shardID) // check if blob size is larger than threshold in Dynamic config if so alert on it every time if blobSize > int64(config.LargeShardHistoryBlobMetricThreshold()) { + if domainName == "cadence-killers-staging" { + logger.Info("INFORMATION DUMP BLOB", tag.Dynamic("currentHistorySize", currentHistorySize), tag.Dynamic("currentHistoryCount", currentHistoryCount), + tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) + } logger.SampleInfo("Workflow writing a large blob", 1, tag.WorkflowDomainName(domainName), - tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) + tag.WorkflowID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } // check if the new history count is greater than our threshold and only count/log it once when it passes it if oldHistoryCount < int64(config.LargeShardHistoryEventMetricThreshold()) && currentHistoryCount >= int64(config.LargeShardHistoryEventMetricThreshold()) { + if domainName == "cadence-killers-staging" { + logger.Info("INFORMATION DUMP COUNT", tag.Dynamic("currentHistorySize", currentHistorySize), tag.Dynamic("currentHistoryCount", currentHistoryCount), + tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) + } logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(domainName), - tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) + tag.WorkflowID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) } // check if the new history size is greater than our threshold and only count/log it once when it passes it if oldHistorySize < int64(config.LargeShardHistorySizeMetricThreshold()) && currentHistorySize >= int64(config.LargeShardHistorySizeMetricThreshold()) { + if domainName == "cadence-killers-staging" { + logger.Info("INFORMATION DUMP SIZE", tag.Dynamic("currentHistorySize", currentHistorySize), tag.Dynamic("currentHistoryCount", currentHistoryCount), + tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) + } logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(domainName), - tag.WorkflowRunID(workflowID), tag.ShardID(shardID)) + tag.WorkflowID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) } } From 1d64236087b9119804ffed5c5cd68707ce996045 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 21 Mar 2023 16:57:29 -0700 Subject: [PATCH 15/29] remove testing code --- service/history/execution/context_util.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 1e2c479d41b..14dfeb5feb8 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -39,30 +39,19 @@ func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Clie shardIDStr := strconv.Itoa(shardID) // check if blob size is larger than threshold in Dynamic config if so alert on it every time if blobSize > int64(config.LargeShardHistoryBlobMetricThreshold()) { - if domainName == "cadence-killers-staging" { - logger.Info("INFORMATION DUMP BLOB", tag.Dynamic("currentHistorySize", currentHistorySize), tag.Dynamic("currentHistoryCount", currentHistoryCount), - tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) - } - logger.SampleInfo("Workflow writing a large blob", 1, tag.WorkflowDomainName(domainName), + logger.SampleInfo("Workflow writing a large blob", config.SampleLoggingRate(), tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } // check if the new history count is greater than our threshold and only count/log it once when it passes it + // this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors if oldHistoryCount < int64(config.LargeShardHistoryEventMetricThreshold()) && currentHistoryCount >= int64(config.LargeShardHistoryEventMetricThreshold()) { - if domainName == "cadence-killers-staging" { - logger.Info("INFORMATION DUMP COUNT", tag.Dynamic("currentHistorySize", currentHistorySize), tag.Dynamic("currentHistoryCount", currentHistoryCount), - tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) - } logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) } // check if the new history size is greater than our threshold and only count/log it once when it passes it if oldHistorySize < int64(config.LargeShardHistorySizeMetricThreshold()) && currentHistorySize >= int64(config.LargeShardHistorySizeMetricThreshold()) { - if domainName == "cadence-killers-staging" { - logger.Info("INFORMATION DUMP SIZE", tag.Dynamic("currentHistorySize", currentHistorySize), tag.Dynamic("currentHistoryCount", currentHistoryCount), - tag.Dynamic("oldHistorySize", oldHistorySize), tag.Dynamic("oldHistoryCount", oldHistoryCount), tag.Dynamic("blobSize", blobSize)) - } logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowID), tag.ShardID(shardID)) metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) From cb02506fe5cd2225f5481db85ddd60bc31f6c027 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 28 Mar 2023 16:55:38 -0700 Subject: [PATCH 16/29] move stuff to context and change default value --- common/dynamicconfig/constants.go | 6 ++-- common/persistence/dataManagerInterfaces.go | 5 +++- service/history/execution/context.go | 6 ++-- service/history/execution/context_util.go | 33 ++++++++++----------- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index fc203da6273..c0eda3bea51 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1349,12 +1349,12 @@ const ( // LargeShardHistorySizeMetricThreshold defines the threshold for what consititutes a large history storage size to alert on // KeyName: system.largeShardHistorySizeMetricThreshold // Value type: Int - // Default value: 10485760 + // Default value: 10485760 (10mb) LargeShardHistorySizeMetricThreshold // LargeShardHistoryEventMetricThreshold defines the threshold for what consititutes a large history event size to alert on // KeyName: system.largeShardHistoryEventMetricThreshold // Value type: Int - // Default value: 10000 + // Default value: 50 * 1024 LargeShardHistoryEventMetricThreshold // LargeShardHistoryBlobMetricThreshold defines the threshold for what consititutes a large history blob size to alert on // KeyName: system.largeShardHistoryBlobMetricThreshold @@ -3476,7 +3476,7 @@ var IntKeys = map[IntKey]DynamicInt{ LargeShardHistoryEventMetricThreshold: DynamicInt{ KeyName: "system.largeShardHistoryEventMetricThreshold", Description: "defines the threshold for what consititutes a large history event length to alert on, default is 10000", - DefaultValue: 10000, + DefaultValue: 50 * 1024, }, LargeShardHistoryBlobMetricThreshold: DynamicInt{ KeyName: "system.largeShardHistoryBlobMetricThreshold", diff --git a/common/persistence/dataManagerInterfaces.go b/common/persistence/dataManagerInterfaces.go index 49d9825774d..93d785c34e0 100644 --- a/common/persistence/dataManagerInterfaces.go +++ b/common/persistence/dataManagerInterfaces.go @@ -361,7 +361,10 @@ type ( // ExecutionStats is the statistics about workflow execution ExecutionStats struct { - HistorySize int64 + HistorySize int64 + OldHistorySize int64 + OldHistoryCount int64 + BlobSize int64 } // ReplicationState represents mutable state information for global domains. diff --git a/service/history/execution/context.go b/service/history/execution/context.go index bddd62a03b1..f1be4d3fb82 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -852,8 +852,10 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( domainName, resp.MutableStateUpdateSessionStats, ) - emitLargeWorkflowShardIDStats(c.logger, c.metricsClient, c.shard.GetConfig(), c.shard.GetShardID(), domainName, c.workflowExecution.WorkflowID, - currentWorkflowSize, currentWorkflowHistoryCount, oldWorkflowSize, oldWorkflowHistoryCount, currentWorkflowSize-oldWorkflowSize) + c.stats.BlobSize = currentWorkflowSize - oldWorkflowSize + c.stats.OldHistoryCount = oldWorkflowHistoryCount + c.stats.OldHistorySize = oldWorkflowSize + c.emitLargeWorkflowShardIDStats() // emit workflow completion stats if any if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil { diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 14dfeb5feb8..d013d400ca6 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -24,8 +24,6 @@ import ( "strconv" "time" - "github.com/uber/cadence/service/history/config" - "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -33,28 +31,27 @@ import ( "github.com/uber/cadence/common/types" ) -func emitLargeWorkflowShardIDStats(logger log.Logger, metricsClient metrics.Client, config *config.Config, shardID int, - domainName string, workflowID string, currentHistorySize int64, currentHistoryCount int64, oldHistorySize int64, oldHistoryCount int64, blobSize int64) { - if config.EnableShardIDMetrics() { - shardIDStr := strconv.Itoa(shardID) +func (c *contextImpl) emitLargeWorkflowShardIDStats() { + if c.shard.GetConfig().EnableShardIDMetrics() { + shardIDStr := strconv.Itoa(c.shard.GetShardID()) // check if blob size is larger than threshold in Dynamic config if so alert on it every time - if blobSize > int64(config.LargeShardHistoryBlobMetricThreshold()) { - logger.SampleInfo("Workflow writing a large blob", config.SampleLoggingRate(), tag.WorkflowDomainName(domainName), - tag.WorkflowID(workflowID), tag.ShardID(shardID)) - metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) + if c.stats.BlobSize > int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()) { + c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()), + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) + c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } // check if the new history count is greater than our threshold and only count/log it once when it passes it // this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors - if oldHistoryCount < int64(config.LargeShardHistoryEventMetricThreshold()) && currentHistoryCount >= int64(config.LargeShardHistoryEventMetricThreshold()) { - logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(domainName), - tag.WorkflowID(workflowID), tag.ShardID(shardID)) - metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) + if c.stats.OldHistoryCount < int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()) && (c.mutableState.GetNextEventID()-1) >= int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()) { + c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) + c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) } // check if the new history size is greater than our threshold and only count/log it once when it passes it - if oldHistorySize < int64(config.LargeShardHistorySizeMetricThreshold()) && currentHistorySize >= int64(config.LargeShardHistorySizeMetricThreshold()) { - logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(domainName), - tag.WorkflowID(workflowID), tag.ShardID(shardID)) - metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) + if c.stats.OldHistorySize < int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()) && c.stats.HistorySize >= int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()) { + c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) + c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) } } } From 1c87d1191d48fe1f7e783338b5aa6e60a17f1735 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 28 Mar 2023 16:59:23 -0700 Subject: [PATCH 17/29] add defaults --- service/history/execution/context.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/service/history/execution/context.go b/service/history/execution/context.go index f1be4d3fb82..1894fcb083e 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -194,7 +194,10 @@ func NewContext( metricsClient: shard.GetMetricsClient(), mutex: locks.NewMutex(), stats: &persistence.ExecutionStats{ - HistorySize: 0, + HistorySize: 0, + OldHistorySize: 0, + OldHistoryCount: 0, + BlobSize: 0, }, } } @@ -211,7 +214,10 @@ func (c *contextImpl) Clear() { c.metricsClient.IncCounter(metrics.WorkflowContextScope, metrics.WorkflowContextCleared) c.mutableState = nil c.stats = &persistence.ExecutionStats{ - HistorySize: 0, + HistorySize: 0, + OldHistorySize: 0, + OldHistoryCount: 0, + BlobSize: 0, } } From f3ae86c224d4529620f1b0172f79e0df9dae7cae Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 28 Mar 2023 17:21:40 -0700 Subject: [PATCH 18/29] add check again domain value --- service/history/execution/context_util.go | 27 ++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index d013d400ca6..cf3669942c2 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -34,21 +34,42 @@ import ( func (c *contextImpl) emitLargeWorkflowShardIDStats() { if c.shard.GetConfig().EnableShardIDMetrics() { shardIDStr := strconv.Itoa(c.shard.GetShardID()) + + var blobSizeWarn int64 + if c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold() < c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName()) { + blobSizeWarn = int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()) + } else { + blobSizeWarn = int64(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())) + } // check if blob size is larger than threshold in Dynamic config if so alert on it every time - if c.stats.BlobSize > int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()) { + if c.stats.BlobSize > blobSizeWarn { c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()), tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } + + var historyCountWarn int64 + if c.shard.GetConfig().LargeShardHistoryEventMetricThreshold() < c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()) { + historyCountWarn = int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()) + } else { + historyCountWarn = int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())) + } // check if the new history count is greater than our threshold and only count/log it once when it passes it // this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors - if c.stats.OldHistoryCount < int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()) && (c.mutableState.GetNextEventID()-1) >= int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()) { + if c.stats.OldHistoryCount < historyCountWarn && (c.mutableState.GetNextEventID()-1) >= historyCountWarn { c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) } + + var historySizeWarn int64 + if c.shard.GetConfig().LargeShardHistorySizeMetricThreshold() < c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName()) { + historySizeWarn = int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()) + } else { + historySizeWarn = int64(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())) + } // check if the new history size is greater than our threshold and only count/log it once when it passes it - if c.stats.OldHistorySize < int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()) && c.stats.HistorySize >= int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()) { + if c.stats.OldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn { c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) From 52ce7351426573315768c58d4d7c98cfbd67e91c Mon Sep 17 00:00:00 2001 From: allenchen2244 <102192478+allenchen2244@users.noreply.github.com> Date: Tue, 28 Mar 2023 18:00:00 -0700 Subject: [PATCH 19/29] Update common/dynamicconfig/constants.go Co-authored-by: Steven L --- common/dynamicconfig/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index c0eda3bea51..886bb3c2400 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -3475,7 +3475,7 @@ var IntKeys = map[IntKey]DynamicInt{ }, LargeShardHistoryEventMetricThreshold: DynamicInt{ KeyName: "system.largeShardHistoryEventMetricThreshold", - Description: "defines the threshold for what consititutes a large history event length to alert on, default is 10000", + Description: "defines the threshold for what consititutes a large history event length to alert on, default is 50k", DefaultValue: 50 * 1024, }, LargeShardHistoryBlobMetricThreshold: DynamicInt{ From 5c4671da41d83e6def40007b1c47b1f376bc52c4 Mon Sep 17 00:00:00 2001 From: allenchen2244 <102192478+allenchen2244@users.noreply.github.com> Date: Tue, 28 Mar 2023 18:00:18 -0700 Subject: [PATCH 20/29] Update common/dynamicconfig/constants.go Co-authored-by: Steven L --- common/dynamicconfig/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 886bb3c2400..831e1cce88b 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1359,7 +1359,7 @@ const ( // LargeShardHistoryBlobMetricThreshold defines the threshold for what consititutes a large history blob size to alert on // KeyName: system.largeShardHistoryBlobMetricThreshold // Value type: Int - // Default value: 262144 + // Default value: 262144 (1/4mb) LargeShardHistoryBlobMetricThreshold // LastIntKey must be the last one in this const group LastIntKey From 189be4df3c4147aef3dd6d13c91ad667c149ff30 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 28 Mar 2023 18:16:34 -0700 Subject: [PATCH 21/29] call flipr only once --- service/history/execution/context_util.go | 24 ++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index cf3669942c2..77fc37c7a5a 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -36,10 +36,12 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats() { shardIDStr := strconv.Itoa(c.shard.GetShardID()) var blobSizeWarn int64 - if c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold() < c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName()) { - blobSizeWarn = int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()) + blobSizeGlobalWarn := c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold() + blobSizeDomainWarn := c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName()) + if blobSizeGlobalWarn < blobSizeDomainWarn { + blobSizeWarn = int64(blobSizeGlobalWarn) } else { - blobSizeWarn = int64(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())) + blobSizeWarn = int64(blobSizeDomainWarn) } // check if blob size is larger than threshold in Dynamic config if so alert on it every time if c.stats.BlobSize > blobSizeWarn { @@ -49,10 +51,12 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats() { } var historyCountWarn int64 - if c.shard.GetConfig().LargeShardHistoryEventMetricThreshold() < c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()) { - historyCountWarn = int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()) + historyCountGlobalWarn := c.shard.GetConfig().LargeShardHistoryEventMetricThreshold() + historyCountDomainWarn := c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()) + if historyCountGlobalWarn < historyCountDomainWarn { + historyCountWarn = int64(historyCountGlobalWarn) } else { - historyCountWarn = int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())) + historyCountWarn = int64(historyCountDomainWarn) } // check if the new history count is greater than our threshold and only count/log it once when it passes it // this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors @@ -63,10 +67,12 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats() { } var historySizeWarn int64 - if c.shard.GetConfig().LargeShardHistorySizeMetricThreshold() < c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName()) { - historySizeWarn = int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()) + historySizeGlobalWarn := c.shard.GetConfig().LargeShardHistorySizeMetricThreshold() + historySizeDomainWarn := c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName()) + if historySizeGlobalWarn < historySizeDomainWarn { + historySizeWarn = int64(historySizeGlobalWarn) } else { - historySizeWarn = int64(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())) + historySizeWarn = int64(historySizeDomainWarn) } // check if the new history size is greater than our threshold and only count/log it once when it passes it if c.stats.OldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn { From 1f1d2a344ac73f54df0fb0d3bf4c13ef7f35d02d Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 28 Mar 2023 18:24:19 -0700 Subject: [PATCH 22/29] move out of persistence obj to fn arg --- common/persistence/dataManagerInterfaces.go | 5 +---- service/history/execution/context.go | 15 +++------------ service/history/execution/context_util.go | 8 ++++---- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/common/persistence/dataManagerInterfaces.go b/common/persistence/dataManagerInterfaces.go index 93d785c34e0..49d9825774d 100644 --- a/common/persistence/dataManagerInterfaces.go +++ b/common/persistence/dataManagerInterfaces.go @@ -361,10 +361,7 @@ type ( // ExecutionStats is the statistics about workflow execution ExecutionStats struct { - HistorySize int64 - OldHistorySize int64 - OldHistoryCount int64 - BlobSize int64 + HistorySize int64 } // ReplicationState represents mutable state information for global domains. diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 1894fcb083e..d675a19dbc2 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -194,10 +194,7 @@ func NewContext( metricsClient: shard.GetMetricsClient(), mutex: locks.NewMutex(), stats: &persistence.ExecutionStats{ - HistorySize: 0, - OldHistorySize: 0, - OldHistoryCount: 0, - BlobSize: 0, + HistorySize: 0, }, } } @@ -214,10 +211,7 @@ func (c *contextImpl) Clear() { c.metricsClient.IncCounter(metrics.WorkflowContextScope, metrics.WorkflowContextCleared) c.mutableState = nil c.stats = &persistence.ExecutionStats{ - HistorySize: 0, - OldHistorySize: 0, - OldHistoryCount: 0, - BlobSize: 0, + HistorySize: 0, } } @@ -858,10 +852,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( domainName, resp.MutableStateUpdateSessionStats, ) - c.stats.BlobSize = currentWorkflowSize - oldWorkflowSize - c.stats.OldHistoryCount = oldWorkflowHistoryCount - c.stats.OldHistorySize = oldWorkflowSize - c.emitLargeWorkflowShardIDStats() + c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize) // emit workflow completion stats if any if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil { diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 77fc37c7a5a..c9060368f83 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -31,7 +31,7 @@ import ( "github.com/uber/cadence/common/types" ) -func (c *contextImpl) emitLargeWorkflowShardIDStats() { +func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCount int64, oldHistorySize int64) { if c.shard.GetConfig().EnableShardIDMetrics() { shardIDStr := strconv.Itoa(c.shard.GetShardID()) @@ -44,7 +44,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats() { blobSizeWarn = int64(blobSizeDomainWarn) } // check if blob size is larger than threshold in Dynamic config if so alert on it every time - if c.stats.BlobSize > blobSizeWarn { + if blobSize > blobSizeWarn { c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()), tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) @@ -60,7 +60,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats() { } // check if the new history count is greater than our threshold and only count/log it once when it passes it // this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors - if c.stats.OldHistoryCount < historyCountWarn && (c.mutableState.GetNextEventID()-1) >= historyCountWarn { + if oldHistoryCount < historyCountWarn && (c.mutableState.GetNextEventID()-1) >= historyCountWarn { c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) @@ -75,7 +75,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats() { historySizeWarn = int64(historySizeDomainWarn) } // check if the new history size is greater than our threshold and only count/log it once when it passes it - if c.stats.OldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn { + if oldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn { c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) From 2b86428b374b97926a190bf91f8fa12cd4e0e527 Mon Sep 17 00:00:00 2001 From: allenchen2244 <102192478+allenchen2244@users.noreply.github.com> Date: Tue, 28 Mar 2023 18:31:52 -0700 Subject: [PATCH 23/29] Update service/history/execution/context.go Co-authored-by: Steven L --- service/history/execution/context.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/service/history/execution/context.go b/service/history/execution/context.go index d675a19dbc2..ad83b8be0a0 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -720,8 +720,10 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( return err } var persistedBlobs events.PersistedBlobs - currentWorkflowSize, oldWorkflowSize := c.GetHistorySize(), c.GetHistorySize() - currentWorkflowHistoryCount, oldWorkflowHistoryCount := c.mutableState.GetNextEventID()-1, c.mutableState.GetNextEventID()-1 + currentWorkflowSize := c.GetHistorySize() + oldWorkflowSize := currentWorkflowSize + currentWorkflowHistoryCount := c.mutableState.GetNextEventID()-1 + oldWorkflowHistoryCount := currentWorkflowHistoryCount for _, workflowEvents := range currentWorkflowEventsSeq { blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) currentWorkflowHistoryCount += int64(len(workflowEvents.Events)) From c5ae571b24dedbb8891e197b7f828971b948596a Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 28 Mar 2023 18:31:11 -0700 Subject: [PATCH 24/29] found the common minint64 --- service/history/execution/context_util.go | 28 ++++------------------- 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index c9060368f83..646afb1b0f8 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -21,6 +21,7 @@ package execution import ( + "github.com/uber/cadence/common" "strconv" "time" @@ -35,14 +36,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo if c.shard.GetConfig().EnableShardIDMetrics() { shardIDStr := strconv.Itoa(c.shard.GetShardID()) - var blobSizeWarn int64 - blobSizeGlobalWarn := c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold() - blobSizeDomainWarn := c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName()) - if blobSizeGlobalWarn < blobSizeDomainWarn { - blobSizeWarn = int64(blobSizeGlobalWarn) - } else { - blobSizeWarn = int64(blobSizeDomainWarn) - } + blobSizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()), int64(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName()))) // check if blob size is larger than threshold in Dynamic config if so alert on it every time if blobSize > blobSizeWarn { c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()), @@ -50,14 +44,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } - var historyCountWarn int64 - historyCountGlobalWarn := c.shard.GetConfig().LargeShardHistoryEventMetricThreshold() - historyCountDomainWarn := c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()) - if historyCountGlobalWarn < historyCountDomainWarn { - historyCountWarn = int64(historyCountGlobalWarn) - } else { - historyCountWarn = int64(historyCountDomainWarn) - } + historyCountWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()), int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()))) // check if the new history count is greater than our threshold and only count/log it once when it passes it // this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors if oldHistoryCount < historyCountWarn && (c.mutableState.GetNextEventID()-1) >= historyCountWarn { @@ -66,14 +53,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) } - var historySizeWarn int64 - historySizeGlobalWarn := c.shard.GetConfig().LargeShardHistorySizeMetricThreshold() - historySizeDomainWarn := c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName()) - if historySizeGlobalWarn < historySizeDomainWarn { - historySizeWarn = int64(historySizeGlobalWarn) - } else { - historySizeWarn = int64(historySizeDomainWarn) - } + historySizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()), int64(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName()))) // check if the new history size is greater than our threshold and only count/log it once when it passes it if oldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn { c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), From 4f58e0599b38e5ea0024e6f45fb0cf545c0c5a23 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Tue, 28 Mar 2023 18:43:33 -0700 Subject: [PATCH 25/29] fix lint --- service/history/execution/context.go | 2 +- service/history/execution/context_util.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/service/history/execution/context.go b/service/history/execution/context.go index ad83b8be0a0..8d851ba7e84 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -722,7 +722,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( var persistedBlobs events.PersistedBlobs currentWorkflowSize := c.GetHistorySize() oldWorkflowSize := currentWorkflowSize - currentWorkflowHistoryCount := c.mutableState.GetNextEventID()-1 + currentWorkflowHistoryCount := c.mutableState.GetNextEventID() - 1 oldWorkflowHistoryCount := currentWorkflowHistoryCount for _, workflowEvents := range currentWorkflowEventsSeq { blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 646afb1b0f8..dd8463a12da 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -21,10 +21,11 @@ package execution import ( - "github.com/uber/cadence/common" "strconv" "time" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" From 280903abf0c5e193eba64172912246581db96d5a Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Wed, 29 Mar 2023 13:58:41 -0700 Subject: [PATCH 26/29] testing something --- service/history/execution/context_util.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index dd8463a12da..350676e9e5c 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -61,6 +61,15 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) } + + if c.GetDomainName() == "cadence-killers-staging" { + c.logger.Info("BlogSizeLimitWarn should be " + strconv.Itoa(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName()))) + c.logger.Info("historyCountWarn should be " + strconv.Itoa(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()))) + c.logger.Info("historySizeWarn should be " + strconv.Itoa(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName()))) + c.logger.Info("blobSizeWarn is " + strconv.Itoa(int(blobSizeWarn)) + "historyCountWarn is " + strconv.Itoa(int(historyCountWarn)) + + " historySizeWarn is " + strconv.Itoa(int(historySizeWarn)) + " other shit like newHistoryCount " + strconv.Itoa(int(c.mutableState.GetNextEventID()-1)) + + " blobSize " + strconv.Itoa(int(blobSize)) + " oldHistoryCount is " + strconv.Itoa(int(oldHistoryCount)) + " oldHistorySize is " + strconv.Itoa(int(oldHistorySize))) + } } } From 13522c662757c1d61e8c1965210bac3cea14d7e4 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Wed, 29 Mar 2023 14:22:08 -0700 Subject: [PATCH 27/29] add tag --- service/history/execution/context_util.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 350676e9e5c..9be4d732c5b 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -61,15 +61,12 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) } - - if c.GetDomainName() == "cadence-killers-staging" { - c.logger.Info("BlogSizeLimitWarn should be " + strconv.Itoa(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName()))) - c.logger.Info("historyCountWarn should be " + strconv.Itoa(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()))) - c.logger.Info("historySizeWarn should be " + strconv.Itoa(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName()))) - c.logger.Info("blobSizeWarn is " + strconv.Itoa(int(blobSizeWarn)) + "historyCountWarn is " + strconv.Itoa(int(historyCountWarn)) + - " historySizeWarn is " + strconv.Itoa(int(historySizeWarn)) + " other shit like newHistoryCount " + strconv.Itoa(int(c.mutableState.GetNextEventID()-1)) + - " blobSize " + strconv.Itoa(int(blobSize)) + " oldHistoryCount is " + strconv.Itoa(int(oldHistoryCount)) + " oldHistorySize is " + strconv.Itoa(int(oldHistorySize))) - } + c.logger.Info("BlogSizeLimitWarn should be "+strconv.Itoa(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())), tag.WorkflowDomainName(c.GetDomainName())) + c.logger.Info("historyCountWarn should be "+strconv.Itoa(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())), tag.WorkflowDomainName(c.GetDomainName())) + c.logger.Info("historySizeWarn should be "+strconv.Itoa(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())), tag.WorkflowDomainName(c.GetDomainName())) + c.logger.Info("blobSizeWarn is "+strconv.Itoa(int(blobSizeWarn))+"historyCountWarn is "+strconv.Itoa(int(historyCountWarn))+ + " historySizeWarn is "+strconv.Itoa(int(historySizeWarn))+" other shit like newHistoryCount "+strconv.Itoa(int(c.mutableState.GetNextEventID()-1))+ + " blobSize "+strconv.Itoa(int(blobSize))+" oldHistoryCount is "+strconv.Itoa(int(oldHistoryCount))+" oldHistorySize is "+strconv.Itoa(int(oldHistorySize)), tag.WorkflowDomainName(c.GetDomainName())) } } From 8b9c5bf864eebb2e000e6ea342ae524ab4ae8156 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Wed, 29 Mar 2023 14:58:04 -0700 Subject: [PATCH 28/29] add 1 more field --- service/history/execution/context.go | 2 +- service/history/execution/context_util.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/service/history/execution/context.go b/service/history/execution/context.go index 8d851ba7e84..c4c5d021673 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -854,7 +854,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( domainName, resp.MutableStateUpdateSessionStats, ) - c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize) + c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize, currentWorkflowHistoryCount) // emit workflow completion stats if any if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil { diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 9be4d732c5b..693163c26e4 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -33,7 +33,7 @@ import ( "github.com/uber/cadence/common/types" ) -func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCount int64, oldHistorySize int64) { +func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCount int64, oldHistorySize int64, newHistoryCount int64) { if c.shard.GetConfig().EnableShardIDMetrics() { shardIDStr := strconv.Itoa(c.shard.GetShardID()) @@ -48,7 +48,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo historyCountWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()), int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()))) // check if the new history count is greater than our threshold and only count/log it once when it passes it // this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors - if oldHistoryCount < historyCountWarn && (c.mutableState.GetNextEventID()-1) >= historyCountWarn { + if oldHistoryCount < historyCountWarn && newHistoryCount >= historyCountWarn { c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) @@ -65,7 +65,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo c.logger.Info("historyCountWarn should be "+strconv.Itoa(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())), tag.WorkflowDomainName(c.GetDomainName())) c.logger.Info("historySizeWarn should be "+strconv.Itoa(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())), tag.WorkflowDomainName(c.GetDomainName())) c.logger.Info("blobSizeWarn is "+strconv.Itoa(int(blobSizeWarn))+"historyCountWarn is "+strconv.Itoa(int(historyCountWarn))+ - " historySizeWarn is "+strconv.Itoa(int(historySizeWarn))+" other shit like newHistoryCount "+strconv.Itoa(int(c.mutableState.GetNextEventID()-1))+ + " historySizeWarn is "+strconv.Itoa(int(historySizeWarn))+" other shit like newHistoryCount "+strconv.Itoa(int(newHistoryCount))+ " blobSize "+strconv.Itoa(int(blobSize))+" oldHistoryCount is "+strconv.Itoa(int(oldHistoryCount))+" oldHistorySize is "+strconv.Itoa(int(oldHistorySize)), tag.WorkflowDomainName(c.GetDomainName())) } } From edd9766e3b705d69332356bea6b4de0d05f4f357 Mon Sep 17 00:00:00 2001 From: Allen Chen Date: Wed, 29 Mar 2023 16:04:18 -0700 Subject: [PATCH 29/29] remove testing code --- service/history/execution/context_util.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 693163c26e4..cc0200e54dc 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -41,16 +41,16 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo // check if blob size is larger than threshold in Dynamic config if so alert on it every time if blobSize > blobSizeWarn { c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()), - tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID())) c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) } historyCountWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()), int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()))) // check if the new history count is greater than our threshold and only count/log it once when it passes it - // this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors + // this seems to double count and I can't figure out why but should be ok to get a rough idea and identify bad actors if oldHistoryCount < historyCountWarn && newHistoryCount >= historyCountWarn { c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), - tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID())) c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) } @@ -58,15 +58,9 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo // check if the new history size is greater than our threshold and only count/log it once when it passes it if oldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn { c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), - tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID())) + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID())) c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) } - c.logger.Info("BlogSizeLimitWarn should be "+strconv.Itoa(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())), tag.WorkflowDomainName(c.GetDomainName())) - c.logger.Info("historyCountWarn should be "+strconv.Itoa(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())), tag.WorkflowDomainName(c.GetDomainName())) - c.logger.Info("historySizeWarn should be "+strconv.Itoa(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())), tag.WorkflowDomainName(c.GetDomainName())) - c.logger.Info("blobSizeWarn is "+strconv.Itoa(int(blobSizeWarn))+"historyCountWarn is "+strconv.Itoa(int(historyCountWarn))+ - " historySizeWarn is "+strconv.Itoa(int(historySizeWarn))+" other shit like newHistoryCount "+strconv.Itoa(int(newHistoryCount))+ - " blobSize "+strconv.Itoa(int(blobSize))+" oldHistoryCount is "+strconv.Itoa(int(oldHistoryCount))+" oldHistorySize is "+strconv.Itoa(int(oldHistorySize)), tag.WorkflowDomainName(c.GetDomainName())) } }