Skip to content

Commit

Permalink
Add domain_type and cluster_groups tags (#4990)
Browse files Browse the repository at this point in the history
* Add domain_type tag

* Add cluster_group tag
  • Loading branch information
vytautas-karpavicius authored Sep 8, 2022
1 parent c3beb64 commit 5fd0127
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 3 deletions.
16 changes: 16 additions & 0 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
"hash/fnv"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -99,6 +101,7 @@ type (
domainCache struct {
status int32
shutdownChan chan struct{}
clusterGroup string
cacheNameToID *atomic.Value
cacheByID *atomic.Value
domainManager persistence.DomainManager
Expand Down Expand Up @@ -141,13 +144,15 @@ type (
// NewDomainCache creates a new instance of cache for holding onto domain information to reduce the load on persistence
func NewDomainCache(
domainManager persistence.DomainManager,
metadata cluster.Metadata,
metricsClient metrics.Client,
logger log.Logger,
) DomainCache {

cache := &domainCache{
status: domainCacheInitialized,
shutdownChan: make(chan struct{}),
clusterGroup: getClusterGroupIdentifier(metadata),
cacheNameToID: &atomic.Value{},
cacheByID: &atomic.Value{},
domainManager: domainManager,
Expand All @@ -163,6 +168,15 @@ func NewDomainCache(
return cache
}

func getClusterGroupIdentifier(metadata cluster.Metadata) string {
var clusters []string
for cluster := range metadata.GetEnabledClusterInfo() {
clusters = append(clusters, cluster)
}
sort.Strings(clusters)
return strings.Join(clusters, "_")
}

func newDomainCache() Cache {
return NewSimple(&SimpleOptions{
InitialCapacity: domainCacheInitialSize,
Expand Down Expand Up @@ -464,6 +478,8 @@ UpdateLoop:

c.scope.Tagged(
metrics.DomainTag(nextEntry.info.Name),
metrics.DomainTypeTag(nextEntry.isGlobalDomain),
metrics.ClusterGroupTag(c.clusterGroup),
metrics.ActiveClusterTag(nextEntry.replicationConfig.ActiveClusterName),
).UpdateGauge(metrics.ActiveClusterGauge, 1)

Expand Down
2 changes: 1 addition & 1 deletion common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *domainCacheSuite) SetupTest() {
s.logger = loggerimpl.NewLoggerForTest(s.Suite)
s.metadataMgr = &mocks.MetadataManager{}
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.domainCache = NewDomainCache(s.metadataMgr, metricsClient, s.logger).(*domainCache)
s.domainCache = NewDomainCache(s.metadataMgr, cluster.GetTestClusterMetadata(true), metricsClient, s.logger).(*domainCache)

s.now = time.Now()
s.domainCache.timeSource = clock.NewEventTimeSource().Update(s.now)
Expand Down
19 changes: 19 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (

instance = "instance"
domain = "domain"
domainType = "domain_type"
clusterGroup = "cluster_group"
sourceCluster = "source_cluster"
targetCluster = "target_cluster"
activeCluster = "active_cluster"
Expand Down Expand Up @@ -83,11 +85,28 @@ func DomainTag(value string) Tag {
return metricWithUnknown(domain, value)
}

// DomainTypeTag returns a tag for domain type.
// This allows differentiating between global/local domains.
func DomainTypeTag(isGlobal bool) Tag {
var value string
if isGlobal {
value = "global"
} else {
value = "local"
}
return simpleMetric{key: domainType, value: value}
}

// DomainUnknownTag returns a new domain:unknown tag-value
func DomainUnknownTag() Tag {
return DomainTag("")
}

// ClusterGroupTag return a new cluster group tag
func ClusterGroupTag(value string) Tag {
return simpleMetric{key: clusterGroup, value: value}
}

// InstanceTag returns a new instance tag
func InstanceTag(value string) Tag {
return simpleMetric{key: instance, value: value}
Expand Down
1 change: 1 addition & 0 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func New(

domainCache := cache.NewDomainCache(
persistenceBean.GetDomainManager(),
params.ClusterMetadata,
params.MetricsClient,
logger,
)
Expand Down
4 changes: 2 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,15 @@ func (c *cadenceImpl) startWorker(hosts map[string][]membership.HostInfo, startW
var replicatorDomainCache cache.DomainCache
if c.workerConfig.EnableReplicator {
metadataManager := persistence.NewDomainPersistenceMetricsClient(c.domainManager, service.GetMetricsClient(), c.logger, &c.persistenceConfig)
replicatorDomainCache = cache.NewDomainCache(metadataManager, service.GetMetricsClient(), service.GetLogger())
replicatorDomainCache = cache.NewDomainCache(metadataManager, c.clusterMetadata, service.GetMetricsClient(), service.GetLogger())
replicatorDomainCache.Start()
c.startWorkerReplicator(service)
}

var clientWorkerDomainCache cache.DomainCache
if c.workerConfig.EnableArchiver {
metadataProxyManager := persistence.NewDomainPersistenceMetricsClient(c.domainManager, service.GetMetricsClient(), c.logger, &c.persistenceConfig)
clientWorkerDomainCache = cache.NewDomainCache(metadataProxyManager, service.GetMetricsClient(), service.GetLogger())
clientWorkerDomainCache = cache.NewDomainCache(metadataProxyManager, c.clusterMetadata, service.GetMetricsClient(), service.GetLogger())
clientWorkerDomainCache.Start()
c.startWorkerClientWorker(params, service, clientWorkerDomainCache)
}
Expand Down

0 comments on commit 5fd0127

Please sign in to comment.