Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test for Common/domain/replication_queue: GetMessagesfromDLQ & AckLevel #5734

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 114 additions & 2 deletions common/domain/replication_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package domain

import (
"bytes"
"context"
"encoding/binary"
"errors"
"testing"

Expand All @@ -32,6 +34,10 @@ import (
"github.com/uber/cadence/common/types"
)

const (
preambleVersion0 byte = 0x59
)

func TestReplicationQueueImpl_Publish(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -122,15 +128,13 @@ func TestGetReplicationMessages(t *testing.T) {
name string
lastID int64
maxCount int
task *types.ReplicationTask
wantErr bool
setupMock func(q *persistence.MockQueueManager)
}{
{
name: "successful message retrieval",
lastID: 100,
maxCount: 10,
task: &types.ReplicationTask{},
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().ReadMessages(gomock.Any(), gomock.Eq(int64(100)), gomock.Eq(10)).Return(persistence.QueueMessageList{}, nil)
Expand Down Expand Up @@ -210,3 +214,111 @@ func TestUpdateAckLevel(t *testing.T) {
})
}
}

func TestReplicationQueueImpl_GetAckLevels(t *testing.T) {
tests := []struct {
name string
want map[string]int64
wantErr bool
setupMock func(q *persistence.MockQueueManager)
}{
{
name: "successful ack levels retrieval",
want: map[string]int64{"testCluster": 100},
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{"testCluster": 100}, nil)
},
},
{
name: "ack levels retrieval fails",
wantErr: true,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().GetAckLevels(gomock.Any()).Return(nil, errors.New("retrieval error"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
tt.setupMock(mockQueue)
got, err := rq.GetAckLevels(context.Background())
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
}
ctrl.Finish()
})
}
}

func mockEncodeReplicationTask(sourceTaskID int64) ([]byte, error) {
var buf bytes.Buffer
buf.WriteByte(preambleVersion0)
binary.Write(&buf, binary.BigEndian, sourceTaskID)
return buf.Bytes(), nil
}

func TestGetMessagesFromDLQ(t *testing.T) {

tests := []struct {
name string
firstID int64
lastID int64
pageSize int
pageToken []byte
taskID int64
wantErr bool
}{
{
name: "successful message retrieval",
firstID: 100,
lastID: 200,
pageSize: 10,
pageToken: []byte("token"),
taskID: 12345,
wantErr: false,
},
{
name: "read messages fails",
firstID: 100,
lastID: 200,
pageSize: 10,
pageToken: []byte("token"),
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)

if !tt.wantErr {
encodedData, _ := mockEncodeReplicationTask(tt.taskID)
messages := []*persistence.QueueMessage{
{ID: 1, Payload: encodedData},
}
mockQueue.EXPECT().ReadMessagesFromDLQ(gomock.Any(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken).Return(messages, []byte("nextToken"), nil)
} else {
mockQueue.EXPECT().ReadMessagesFromDLQ(gomock.Any(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken).Return(nil, nil, errors.New("read error"))
}

replicationTasks, token, err := rq.GetMessagesFromDLQ(context.Background(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Len(t, replicationTasks, 1, "Expected one replication task to be returned")
assert.Equal(t, []byte("nextToken"), token, "Expected token to match 'nextToken'")
}
ctrl.Finish()
})
}
}
Loading