diff --git a/service/history/engine/engineimpl/describe_workflow_execution_test.go b/service/history/engine/engineimpl/describe_workflow_execution_test.go index f5c1d331ec7..0e8cf5cc472 100644 --- a/service/history/engine/engineimpl/describe_workflow_execution_test.go +++ b/service/history/engine/engineimpl/describe_workflow_execution_test.go @@ -41,8 +41,8 @@ func TestDescribeWorkflowExecution(t *testing.T) { eft := testdata.NewEngineForTest(t, NewEngineWithShardContext) childDomainID := "deleted-domain" - eft.Ctx.Resource.DomainCache.EXPECT().GetDomainName(constants.TestParentDomainID).Return(constants.TestParentDomainName, nil).AnyTimes() - eft.Ctx.Resource.DomainCache.EXPECT().GetDomainName(childDomainID).Return("", &types.EntityNotExistsError{}).AnyTimes() + eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainName(constants.TestParentDomainID).Return(constants.TestParentDomainName, nil).AnyTimes() + eft.ShardCtx.Resource.DomainCache.EXPECT().GetDomainName(childDomainID).Return("", &types.EntityNotExistsError{}).AnyTimes() execution := types.WorkflowExecution{ WorkflowID: constants.TestWorkflowID, RunID: constants.TestRunID, @@ -113,7 +113,7 @@ func TestDescribeWorkflowExecution(t *testing.T) { InitiatedID: 3000, ParentClosePolicy: types.ParentClosePolicyAbandon.Ptr(), } - eft.Ctx.Resource.ExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{ + eft.ShardCtx.Resource.ExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{ State: &persistence.WorkflowMutableState{ ActivityInfos: map[int64]*persistence.ActivityInfo{ 1: { @@ -191,7 +191,7 @@ func TestDescribeWorkflowExecution(t *testing.T) { }, }, }, nil).Once() - eft.Ctx.Resource.HistoryMgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(&persistence.ReadHistoryBranchResponse{ + eft.ShardCtx.Resource.HistoryMgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(&persistence.ReadHistoryBranchResponse{ HistoryEvents: []*types.HistoryEvent{ { ID: 1, diff --git a/service/history/engine/engineimpl/refresh_workflow_tasks_test.go b/service/history/engine/engineimpl/refresh_workflow_tasks_test.go new file mode 100644 index 00000000000..24af29e8fd4 --- /dev/null +++ b/service/history/engine/engineimpl/refresh_workflow_tasks_test.go @@ -0,0 +1,206 @@ +// Copyright (c) 2017-2021 Uber Technologies, Inc. +// Portions of the Software are attributed to Copyright (c) 2021 Temporal Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package engineimpl + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/mock" + + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/constants" + "github.com/uber/cadence/service/history/engine/testdata" +) + +func TestRefreshWorkflowTasks(t *testing.T) { + tests := []struct { + name string + execution types.WorkflowExecution + getWFExecErr error + readHistBranchErr error + updateWFExecErr error + wantErr bool + }{ + { + name: "runid is not uuid", + execution: types.WorkflowExecution{ + WorkflowID: constants.TestWorkflowID, + RunID: "not-a-uuid", + }, + wantErr: true, + }, + { + name: "failed to get workflow execution", + getWFExecErr: errors.New("some random error"), + execution: types.WorkflowExecution{ + WorkflowID: constants.TestWorkflowID, + RunID: constants.TestRunID, + }, + wantErr: true, + }, + { + name: "failed to get workflow start event", + readHistBranchErr: errors.New("some random error"), + execution: types.WorkflowExecution{ + WorkflowID: constants.TestWorkflowID, + RunID: constants.TestRunID, + }, + wantErr: true, + }, + { + name: "failed to update workflow execution", + // returning TimeoutError because it doesn't get retried + updateWFExecErr: &persistence.TimeoutError{}, + execution: types.WorkflowExecution{ + WorkflowID: constants.TestWorkflowID, + RunID: constants.TestRunID, + }, + wantErr: true, + }, + { + name: "success", + execution: types.WorkflowExecution{ + WorkflowID: constants.TestWorkflowID, + RunID: constants.TestRunID, + }, + wantErr: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + eft := testdata.NewEngineForTest(t, NewEngineWithShardContext) + eft.Engine.Start() + defer eft.Engine.Stop() + + // GetWorkflowExecution prep + getExecReq := &persistence.GetWorkflowExecutionRequest{ + DomainID: constants.TestDomainID, + Execution: tc.execution, + DomainName: constants.TestDomainName, + RangeID: 1, + } + getExecResp := &persistence.GetWorkflowExecutionResponse{ + State: &persistence.WorkflowMutableState{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: constants.TestDomainID, + WorkflowID: constants.TestWorkflowID, + RunID: constants.TestRunID, + }, + ExecutionStats: &persistence.ExecutionStats{}, + }, + MutableStateStats: &persistence.MutableStateStats{}, + } + eft.ShardCtx.Resource.ExecutionMgr. + On("GetWorkflowExecution", mock.Anything, getExecReq). + Return(getExecResp, tc.getWFExecErr). + Once() + + // ReadHistoryBranch prep + historyBranchResp := &persistence.ReadHistoryBranchResponse{ + HistoryEvents: []*types.HistoryEvent{ + // first event. + { + ID: 1, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{}, + }, + }, + } + historyMgr := eft.ShardCtx.Resource.HistoryMgr + historyMgr. + On("ReadHistoryBranch", mock.Anything, mock.Anything). + Return(historyBranchResp, tc.readHistBranchErr). + Once() + + // UpdateWorkflowExecution prep + var gotUpdateExecReq *persistence.UpdateWorkflowExecutionRequest + updateExecResp := &persistence.UpdateWorkflowExecutionResponse{ + MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}, + } + eft.ShardCtx.Resource.ExecutionMgr. + On("UpdateWorkflowExecution", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + var ok bool + gotUpdateExecReq, ok = args.Get(1).(*persistence.UpdateWorkflowExecutionRequest) + if !ok { + t.Fatalf("failed to cast input to *persistence.UpdateWorkflowExecutionRequest, type is %T", args.Get(1)) + } + }). + Return(updateExecResp, tc.updateWFExecErr). + Once() + + // UpdateShard prep. this is needed to update the shard's rangeID for failure cases. + eft.ShardCtx.Resource.ShardMgr. + On("UpdateShard", mock.Anything, mock.Anything). + Return(nil) + + // Call RefreshWorkflowTasks + err := eft.Engine.RefreshWorkflowTasks( + context.Background(), + constants.TestDomainID, + tc.execution, + ) + + // Error validations + if (err != nil) != tc.wantErr { + t.Fatalf("RefreshWorkflowTasks() error = %v, wantErr %v", err, tc.wantErr) + } + if err != nil { + return + } + + // UpdateWorkflowExecutionRequest validations + if gotUpdateExecReq == nil { + t.Fatal("UpdateWorkflowExecutionRequest is nil") + } + + if gotUpdateExecReq.RangeID != 1 { + t.Errorf("got RangeID %v, want 1", gotUpdateExecReq.RangeID) + } + + if gotUpdateExecReq.DomainName != constants.TestDomainName { + t.Errorf("got DomainName %v, want %v", gotUpdateExecReq.DomainName, constants.TestDomainName) + } + + if gotUpdateExecReq.Mode != persistence.UpdateWorkflowModeIgnoreCurrent { + t.Errorf("got Mode %v, want %v", gotUpdateExecReq.Mode, persistence.UpdateWorkflowModeIgnoreCurrent) + } + + if len(gotUpdateExecReq.UpdateWorkflowMutation.TimerTasks) != 2 { + t.Errorf("got %v TimerTasks, want 2", len(gotUpdateExecReq.UpdateWorkflowMutation.TimerTasks)) + } else { + timer0, ok := gotUpdateExecReq.UpdateWorkflowMutation.TimerTasks[0].(*persistence.WorkflowTimeoutTask) + if !ok { + t.Fatalf("failed to cast TimerTask[0] to *persistence.WorkflowTimeoutTask, type is %T", timer0) + } + + timer1, ok := gotUpdateExecReq.UpdateWorkflowMutation.TimerTasks[1].(*persistence.DecisionTimeoutTask) + if !ok { + t.Fatalf("failed to cast TimerTask[0] to *persistence.WorkflowTimeoutTask, type is %T", timer1) + } + } + }) + } +} diff --git a/service/history/engine/testdata/engine_for_tests.go b/service/history/engine/testdata/engine_for_tests.go index 100bd640cb0..06fa64d9993 100644 --- a/service/history/engine/testdata/engine_for_tests.go +++ b/service/history/engine/testdata/engine_for_tests.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/mock" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" @@ -47,7 +48,7 @@ import ( type EngineForTest struct { Engine engine.Engine // Add mocks or other fields here - Ctx *shard.TestContext + ShardCtx *shard.TestContext } // NewEngineFn is defined as an alias for engineimpl.NewEngineWithShardContext to avoid circular dependency @@ -90,6 +91,12 @@ func NewEngineForTest(t *testing.T, newEngineFn NewEngineFn) *EngineForTest { domainCache.EXPECT().RegisterDomainChangeCallback(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() domainCache.EXPECT().UnregisterDomainChangeCallback(gomock.Any()).Times(1) + executionMgr := shardCtx.Resource.ExecutionMgr + // RangeCompleteReplicationTask is called by taskProcessorImpl's background loop + executionMgr. + On("RangeCompleteReplicationTask", mock.Anything, mock.Anything). + Return(&persistence.RangeCompleteReplicationTaskResponse{}, nil) + membershipResolver := shardCtx.Resource.MembershipResolver membershipResolver.EXPECT().MemberCount(gomock.Any()).Return(1, nil).AnyTimes() @@ -180,7 +187,7 @@ func NewEngineForTest(t *testing.T, newEngineFn NewEngineFn) *EngineForTest { t.Cleanup(historyEventNotifier.Stop) return &EngineForTest{ - Engine: engine, - Ctx: shardCtx, + Engine: engine, + ShardCtx: shardCtx, } }