Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add domain_type and cluster_groups tags #4990

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
"hash/fnv"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -99,6 +101,7 @@ type (
domainCache struct {
status int32
shutdownChan chan struct{}
clusterGroup string
cacheNameToID *atomic.Value
cacheByID *atomic.Value
domainManager persistence.DomainManager
Expand Down Expand Up @@ -141,13 +144,15 @@ type (
// NewDomainCache creates a new instance of cache for holding onto domain information to reduce the load on persistence
func NewDomainCache(
domainManager persistence.DomainManager,
metadata cluster.Metadata,
metricsClient metrics.Client,
logger log.Logger,
) DomainCache {

cache := &domainCache{
status: domainCacheInitialized,
shutdownChan: make(chan struct{}),
clusterGroup: getClusterGroupIdentifier(metadata),
cacheNameToID: &atomic.Value{},
cacheByID: &atomic.Value{},
domainManager: domainManager,
Expand All @@ -163,6 +168,15 @@ func NewDomainCache(
return cache
}

func getClusterGroupIdentifier(metadata cluster.Metadata) string {
var clusters []string
for cluster := range metadata.GetEnabledClusterInfo() {
clusters = append(clusters, cluster)
}
sort.Strings(clusters)
return strings.Join(clusters, "_")
}

func newDomainCache() Cache {
return NewSimple(&SimpleOptions{
InitialCapacity: domainCacheInitialSize,
Expand Down Expand Up @@ -464,6 +478,8 @@ UpdateLoop:

c.scope.Tagged(
metrics.DomainTag(nextEntry.info.Name),
metrics.DomainTypeTag(nextEntry.isGlobalDomain),
metrics.ClusterGroupTag(c.clusterGroup),
metrics.ActiveClusterTag(nextEntry.replicationConfig.ActiveClusterName),
).UpdateGauge(metrics.ActiveClusterGauge, 1)

Expand Down
2 changes: 1 addition & 1 deletion common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *domainCacheSuite) SetupTest() {
s.logger = loggerimpl.NewLoggerForTest(s.Suite)
s.metadataMgr = &mocks.MetadataManager{}
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.domainCache = NewDomainCache(s.metadataMgr, metricsClient, s.logger).(*domainCache)
s.domainCache = NewDomainCache(s.metadataMgr, cluster.GetTestClusterMetadata(true), metricsClient, s.logger).(*domainCache)

s.now = time.Now()
s.domainCache.timeSource = clock.NewEventTimeSource().Update(s.now)
Expand Down
19 changes: 19 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (

instance = "instance"
domain = "domain"
domainType = "domain_type"
clusterGroup = "cluster_group"
sourceCluster = "source_cluster"
targetCluster = "target_cluster"
activeCluster = "active_cluster"
Expand Down Expand Up @@ -83,11 +85,28 @@ func DomainTag(value string) Tag {
return metricWithUnknown(domain, value)
}

// DomainTypeTag returns a tag for domain type.
// This allows differentiating between global/local domains.
func DomainTypeTag(isGlobal bool) Tag {
var value string
if isGlobal {
value = "global"
} else {
value = "local"
}
return simpleMetric{key: domainType, value: value}
}

// DomainUnknownTag returns a new domain:unknown tag-value
func DomainUnknownTag() Tag {
return DomainTag("")
}

// ClusterGroupTag return a new cluster group tag
func ClusterGroupTag(value string) Tag {
return simpleMetric{key: clusterGroup, value: value}
}

// InstanceTag returns a new instance tag
func InstanceTag(value string) Tag {
return simpleMetric{key: instance, value: value}
Expand Down
1 change: 1 addition & 0 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func New(

domainCache := cache.NewDomainCache(
persistenceBean.GetDomainManager(),
params.ClusterMetadata,
params.MetricsClient,
logger,
)
Expand Down
4 changes: 2 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,15 @@ func (c *cadenceImpl) startWorker(hosts map[string][]membership.HostInfo, startW
var replicatorDomainCache cache.DomainCache
if c.workerConfig.EnableReplicator {
metadataManager := persistence.NewDomainPersistenceMetricsClient(c.domainManager, service.GetMetricsClient(), c.logger, &c.persistenceConfig)
replicatorDomainCache = cache.NewDomainCache(metadataManager, service.GetMetricsClient(), service.GetLogger())
replicatorDomainCache = cache.NewDomainCache(metadataManager, c.clusterMetadata, service.GetMetricsClient(), service.GetLogger())
replicatorDomainCache.Start()
c.startWorkerReplicator(service)
}

var clientWorkerDomainCache cache.DomainCache
if c.workerConfig.EnableArchiver {
metadataProxyManager := persistence.NewDomainPersistenceMetricsClient(c.domainManager, service.GetMetricsClient(), c.logger, &c.persistenceConfig)
clientWorkerDomainCache = cache.NewDomainCache(metadataProxyManager, service.GetMetricsClient(), service.GetLogger())
clientWorkerDomainCache = cache.NewDomainCache(metadataProxyManager, c.clusterMetadata, service.GetMetricsClient(), service.GetLogger())
clientWorkerDomainCache.Start()
c.startWorkerClientWorker(params, service, clientWorkerDomainCache)
}
Expand Down