Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[matching] Simplity poller extraction in task list manager #6333

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions service/matching/poller/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,27 @@ type (
IsolationGroup string
}

History struct {
History interface {
UpdatePollerInfo(id Identity, info Info)
HasPollerAfter(earliestAccessTime time.Time) bool
GetPollerInfo(earliestAccessTime time.Time) []*types.PollerInfo
GetPollerIsolationGroups(earliestAccessTime time.Time) map[string]int
}

history struct {
// poller ID -> pollerInfo
// pollers map[pollerID]pollerInfo
history cache.Cache
historyCache cache.Cache

// OnHistoryUpdatedFunc is a function called when the poller history was updated
// OnHistoryUpdatedFunc is a function called when the poller historyCache was updated
onHistoryUpdatedFunc HistoryUpdatedFunc
}

// HistoryUpdatedFunc is a type for notifying applications when the poller history was updated
// HistoryUpdatedFunc is a type for notifying applications when the poller historyCache was updated
HistoryUpdatedFunc func()
)

func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.TimeSource) *History {
func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.TimeSource) History {
opts := &cache.Options{
InitialCapacity: pollerHistoryInitSize,
TTL: pollerHistoryTTL,
Expand All @@ -65,30 +72,60 @@ func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.Ti
TimeSource: timeSource,
}

return &History{
history: cache.New(opts),
return &history{
historyCache: cache.New(opts),
onHistoryUpdatedFunc: historyUpdatedFunc,
}
}

func (pollers *History) UpdatePollerInfo(id Identity, info Info) {
pollers.history.Put(id, &info)
func (pollers *history) UpdatePollerInfo(id Identity, info Info) {
pollers.historyCache.Put(id, &info)
if pollers.onHistoryUpdatedFunc != nil {
pollers.onHistoryUpdatedFunc()
}
}

func (pollers *History) GetPollerInfo(earliestAccessTime time.Time) []*types.PollerInfo {
func (pollers *history) HasPollerAfter(earliestAccessTime time.Time) bool {
if pollers.historyCache.Size() == 0 {
return false
}

noTimeFilter := earliestAccessTime.IsZero()

ite := pollers.historyCache.Iterator()
defer ite.Close()

for ite.HasNext() {
entry := ite.Next()
lastAccessTime := entry.CreateTime()
if noTimeFilter || earliestAccessTime.Before(lastAccessTime) {
return true
}
}

return false
}

func (pollers *history) GetPollerInfo(earliestAccessTime time.Time) []*types.PollerInfo {
var result []*types.PollerInfo
ite := pollers.history.Iterator()
// optimistic size get, it can change before Iterator call.
size := pollers.historyCache.Size()

ite := pollers.historyCache.Iterator()
defer ite.Close()

noTimeFilter := earliestAccessTime.IsZero()
if noTimeFilter {
result = make([]*types.PollerInfo, 0, size)
}

for ite.HasNext() {
entry := ite.Next()
key := entry.Key().(Identity)
value := entry.Value().(*Info)
// TODO add IP, T1396795
lastAccessTime := entry.CreateTime()
if earliestAccessTime.Before(lastAccessTime) {
if noTimeFilter || earliestAccessTime.Before(lastAccessTime) {
result = append(result, &types.PollerInfo{
Identity: string(key),
LastAccessTime: common.Int64Ptr(lastAccessTime.UnixNano()),
Expand All @@ -99,15 +136,18 @@ func (pollers *History) GetPollerInfo(earliestAccessTime time.Time) []*types.Pol
return result
}

func (pollers *History) GetPollerIsolationGroups(earliestAccessTime time.Time) map[string]int {
func (pollers *history) GetPollerIsolationGroups(earliestAccessTime time.Time) map[string]int {
groupSet := make(map[string]int)
ite := pollers.history.Iterator()
ite := pollers.historyCache.Iterator()
defer ite.Close()

noTimeFilter := earliestAccessTime.IsZero()

for ite.HasNext() {
entry := ite.Next()
value := entry.Value().(*Info)
lastAccessTime := entry.CreateTime()
if earliestAccessTime.Before(lastAccessTime) {
if noTimeFilter || earliestAccessTime.Before(lastAccessTime) {
if value.IsolationGroup != "" {
groupSet[value.IsolationGroup]++
}
Expand Down
176 changes: 141 additions & 35 deletions service/matching/poller/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func TestNewPollerHistory(t *testing.T) {
p := NewPollerHistory(nil, nil)
assert.NotNil(t, p)
assert.NotNil(t, p.history)
assert.NotNil(t, p.(*history).historyCache)
}

func TestUpdatePollerInfo(t *testing.T) {
Expand All @@ -48,46 +48,152 @@ func TestUpdatePollerInfo(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockCache.EXPECT().Put(Identity("test"), &Info{IsolationGroup: "dca1"}).Return(nil)
p := &History{
p := &history{
onHistoryUpdatedFunc: updateFn,
history: mockCache,
historyCache: mockCache,
}
p.UpdatePollerInfo(Identity("test"), Info{IsolationGroup: "dca1"})
assert.True(t, updated)
}

func TestHistory_HasPollerAfter(t *testing.T) {
t.Run("empty historyCache", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockCache.EXPECT().Size().Return(0)

p := &history{
historyCache: mockCache,
}
assert.False(t, p.HasPollerAfter(time.Now()))
})
t.Run("no poller after", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockCache.EXPECT().Size().Return(2)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)

now := time.Now()

mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().CreateTime().Return(now.Add(-time.Second)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().CreateTime().Return(now.Add(-2*time.Second)),
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)

p := &history{
historyCache: mockCache,
}
assert.False(t, p.HasPollerAfter(now))
})
t.Run("has poller after", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockCache.EXPECT().Size().Return(2)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)

now := time.Now()

mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().CreateTime().Return(now.Add(-time.Second)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().CreateTime().Return(now.Add(time.Second)),
mockIter.EXPECT().Close(),
)

p := &history{
historyCache: mockCache,
}
assert.True(t, p.HasPollerAfter(now))
})
}

func TestGetPollerInfo(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)
t.Run("with_time_filter", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)

mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test0")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca1", RatePerSecond: 1.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(1000)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test1")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca2", RatePerSecond: 2.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(0)),
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)
p := &History{
history: mockCache,
}
info := p.GetPollerInfo(time.UnixMilli(500))
assert.Equal(t, []*types.PollerInfo{
{
Identity: "test0",
LastAccessTime: common.Ptr(time.UnixMilli(1000).UnixNano()),
RatePerSecond: 1.0,
},
}, info)
mockCache.EXPECT().Size().Return(2)

mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test0")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca1", RatePerSecond: 1.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(1000)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test1")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca2", RatePerSecond: 2.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(0)),
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)
p := &history{
historyCache: mockCache,
}
info := p.GetPollerInfo(time.UnixMilli(500))
assert.Equal(t, []*types.PollerInfo{
{
Identity: "test0",
LastAccessTime: common.Ptr(time.UnixMilli(1000).UnixNano()),
RatePerSecond: 1.0,
},
}, info)
})
t.Run("no_time_filter", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)
mockCache.EXPECT().Size().Return(2)
mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test0")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca1", RatePerSecond: 1.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(1000)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test1")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca2", RatePerSecond: 2.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(0)),
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)
p := &history{
historyCache: mockCache,
}
info := p.GetPollerInfo(time.Time{})
assert.Equal(t, []*types.PollerInfo{
{
Identity: "test0",
LastAccessTime: common.Ptr(time.UnixMilli(1000).UnixNano()),
RatePerSecond: 1.0,
},
{
Identity: "test1",
LastAccessTime: common.Ptr(time.UnixMilli(0).UnixNano()),
RatePerSecond: 2.0,
},
}, info)
})
}

func TestGetPollerIsolationGroups(t *testing.T) {
Expand All @@ -109,8 +215,8 @@ func TestGetPollerIsolationGroups(t *testing.T) {
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)
p := &History{
history: mockCache,
p := &history{
historyCache: mockCache,
}
groups := p.GetPollerIsolationGroups(time.UnixMilli(500))
assert.Equal(t, map[string]int{"dca1": 1}, groups)
Expand Down
42 changes: 24 additions & 18 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type (
timeSource clock.TimeSource
domainName string
// pollerHistory stores poller which poll from this tasklist in last few minutes
pollerHistory *poller.History
pollerHistory poller.History
// outstandingPollsMap is needed to keep track of all outstanding pollers for a
// particular tasklist. PollerID generated by frontend is used as the key and
// CancelFunc is the value. This is used to cancel the context to unblock any
Expand Down Expand Up @@ -400,14 +400,8 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond
if pollerID != "" {
// Found pollerID on context, add it to the map to allow it to be canceled in
// response to CancelPoller call
c.outstandingPollsLock.Lock()
c.outstandingPollsMap[pollerID] = outstandingPollerInfo{isolationGroup: isolationGroup, cancel: cancel}
c.outstandingPollsLock.Unlock()
defer func() {
c.outstandingPollsLock.Lock()
delete(c.outstandingPollsMap, pollerID)
c.outstandingPollsLock.Unlock()
}()
c.addOutstandingPoller(pollerID, isolationGroup, cancel)
defer c.removeOutstandingPoller(pollerID)()
}

identity := IdentityFromContext(ctx)
Expand Down Expand Up @@ -452,15 +446,7 @@ func (c *taskListManagerImpl) GetAllPollerInfo() []*types.PollerInfo {

// HasPollerAfter checks if there is any poller after a timestamp
func (c *taskListManagerImpl) HasPollerAfter(accessTime time.Time) bool {
inflightPollerCount := 0
c.outstandingPollsLock.Lock()
inflightPollerCount = len(c.outstandingPollsMap)
c.outstandingPollsLock.Unlock()
if inflightPollerCount > 0 {
return true
}
recentPollers := c.pollerHistory.GetPollerInfo(accessTime)
return len(recentPollers) > 0
return c.hasOutstandingPolls() || c.pollerHistory.HasPollerAfter(accessTime)
}

func (c *taskListManagerImpl) CancelPoller(pollerID string) {
Expand Down Expand Up @@ -528,6 +514,26 @@ func (c *taskListManagerImpl) TaskListID() *Identifier {
return c.taskListID
}

func (c *taskListManagerImpl) addOutstandingPoller(pollerID string, isolationGroup string, cancel context.CancelFunc) {
c.outstandingPollsLock.Lock()
defer c.outstandingPollsLock.Unlock()
c.outstandingPollsMap[pollerID] = outstandingPollerInfo{isolationGroup: isolationGroup, cancel: cancel}
}

func (c *taskListManagerImpl) removeOutstandingPoller(pollerID string) func() {
return func() {
c.outstandingPollsLock.Lock()
defer c.outstandingPollsLock.Unlock()
delete(c.outstandingPollsMap, pollerID)
}
}

func (c *taskListManagerImpl) hasOutstandingPolls() bool {
c.outstandingPollsLock.Lock()
defer c.outstandingPollsLock.Unlock()
return len(c.outstandingPollsMap) > 0
}

// Retry operation on transient error. On rangeID update by another process calls c.Stop().
func (c *taskListManagerImpl) executeWithRetry(
operation func() (interface{}, error),
Expand Down
Loading
Loading