Skip to content

Commit

Permalink
Adding more tests to shard/context.go (cadence-workflow#6052)
Browse files Browse the repository at this point in the history
  • Loading branch information
dkrotx authored and timl3136 committed Jun 6, 2024
1 parent 2b70478 commit 9c1a51c
Showing 1 changed file with 111 additions and 0 deletions.
111 changes: 111 additions & 0 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/uber-go/tally"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/testlogger"
Expand All @@ -55,6 +56,10 @@ const (
testRangeID = 1
testTransferMaxReadLevel = 10
testMaxTransferSequenceNumber = 100
testCluster = "test-cluster"
testDomain = "test-domain"
testDomainID = "test-domain-id"
testWorkflowID = "test-workflow-id"
)

type (
Expand Down Expand Up @@ -101,6 +106,7 @@ func (s *contextTestSuite) newContext() *contextImpl {
// when acquiring the shard if they are nil
ClusterTransferAckLevel: make(map[string]int64),
ClusterTimerAckLevel: make(map[string]time.Time),
ClusterReplicationLevel: make(map[string]int64),
TransferProcessingQueueStates: &types.ProcessingQueueStates{
StatesByCluster: make(map[string][]*types.ProcessingQueueState),
},
Expand Down Expand Up @@ -195,6 +201,40 @@ func (s *contextTestSuite) TestRenewRangeLockedRetriesExceeded() {
s.Error(err)
}

func (s *contextTestSuite) TestUpdateClusterReplicationLevel_Succeeds() {
lastTaskID := int64(123)

s.mockShardManager.On("UpdateShard", mock.Anything, mock.Anything).Once().Return(nil)
err := s.context.UpdateClusterReplicationLevel(testCluster, lastTaskID)
s.Require().NoError(err)

s.Equal(lastTaskID, s.context.GetClusterReplicationLevel(testCluster))
}

func (s *contextTestSuite) TestUpdateClusterReplicationLevel_FailsWhenUpdateShardFail() {
ownershipLostError := &persistence.ShardOwnershipLostError{ShardID: testShardID, Msg: "testing ownership lost"}
shardClosed := false
closeCallbackCalled := make(chan bool)

s.mockShardManager.On("UpdateShard", mock.Anything, mock.Anything).Return(ownershipLostError)
s.context.closeCallback = func(int, *historyShardsItem) {
shardClosed = true
closeCallbackCalled <- true
}

err := s.context.UpdateClusterReplicationLevel(testCluster, int64(123))

select {
case <-closeCallbackCalled:
break
case <-time.NewTimer(time.Second).C:
s.T().Fatal("close callback is still not called")
}

s.Require().ErrorContains(err, ownershipLostError.Msg)
s.True(shardClosed, "the shard should have been closed on ShardOwnershipLostError")
}

func (s *contextTestSuite) TestReplicateFailoverMarkersSuccess() {
s.mockResource.ExecutionMgr.On("CreateFailoverMarkerTasks", mock.Anything, mock.Anything).Once().Return(nil)

Expand All @@ -203,6 +243,77 @@ func (s *contextTestSuite) TestReplicateFailoverMarkersSuccess() {
s.NoError(err)
}

func (s *contextTestSuite) TestCreateWorkflowExecution_Succeeds() {
ctx := context.Background()
request := &persistence.CreateWorkflowExecutionRequest{
DomainName: testDomain,
NewWorkflowSnapshot: persistence.WorkflowSnapshot{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
},
},
}

domainCacheEntry := cache.NewLocalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: testDomainID},
&persistence.DomainConfig{Retention: 7},
testCluster,
)
s.mockResource.DomainCache.EXPECT().GetDomainByID(testDomainID).Return(domainCacheEntry, nil)

persistenceResponse := &persistence.CreateWorkflowExecutionResponse{}
s.mockResource.ExecutionMgr.On("CreateWorkflowExecution", ctx, mock.Anything).Once().Return(persistenceResponse, nil)

resp, err := s.context.CreateWorkflowExecution(ctx, request)
s.NoError(err)
s.NotNil(resp)
}

func (s *contextTestSuite) TestValidateAndUpdateFailoverMarkers() {
domainFailoverVersion := 100
domainCacheEntryInactiveCluster := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: testDomainID},
&persistence.DomainConfig{Retention: 7},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName, // active is TestCurrentClusterName
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
int64(domainFailoverVersion),
)
domainCacheEntryActiveCluster := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: testDomainID},
&persistence.DomainConfig{Retention: 7},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName, // active cluster
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
int64(domainFailoverVersion),
)
s.mockResource.DomainCache.EXPECT().GetDomainByID(testDomainID).Return(domainCacheEntryInactiveCluster, nil)

failoverMarker := types.FailoverMarkerAttributes{
DomainID: testDomainID,
FailoverVersion: 101,
}

s.mockShardManager.On("UpdateShard", mock.Anything, mock.Anything).Return(nil)
s.NoError(s.context.AddingPendingFailoverMarker(&failoverMarker))
s.Require().Len(s.context.shardInfo.PendingFailoverMarkers, 1, "we should have one failover marker saved since the cluster is not active")

s.mockResource.DomainCache.EXPECT().GetDomainByID(testDomainID).Return(domainCacheEntryActiveCluster, nil)

pendingFailoverMarkers, err := s.context.ValidateAndUpdateFailoverMarkers()
s.NoError(err)
s.Empty(pendingFailoverMarkers, "all pending failover tasks should be cleaned up")
}

func (s *contextTestSuite) TestGetAndUpdateProcessingQueueStates() {
clusterName := cluster.TestCurrentClusterName
var initialQueueStates [][]*types.ProcessingQueueState
Expand Down

0 comments on commit 9c1a51c

Please sign in to comment.