Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a dynamic config for cassandra all consistency level delete #5000

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.Pe
}

func newConfigStoreClient(clientCfg *csc.ClientConfig, persistenceCfg *config.NoSQL, logger log.Logger, doneCh chan struct{}) (*configStoreClient, error) {
store, err := nosql.NewNoSQLConfigStore(*persistenceCfg, logger)
store, err := nosql.NewNoSQLConfigStore(*persistenceCfg, logger, nil)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *configStoreClientSuite) SetupSuite() {

mockPlugin := nosqlplugin.NewMockPlugin(s.mockController)
mockPlugin.EXPECT().
CreateDB(gomock.Any(), gomock.Any()).
CreateDB(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil).AnyTimes()
nosql.RegisterPlugin("cassandra", mockPlugin)
}
Expand Down
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,8 @@ const (
// Default value: false
EnablePendingActivityValidation

EnableCassandraAllConsistencyLevelDelete

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -3730,6 +3732,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Enables pending activity count limiting/validation",
DefaultValue: false,
},
EnableCassandraAllConsistencyLevelDelete: DynamicBool{
KeyName: "system.enableCassandraAllConsistencyLevelDelete",
Description: "Uses all consistency level for Cassandra delete operations",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]}
switch {
case defaultCfg.NoSQL != nil:
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.NoSQL, clusterName, f.logger)
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.NoSQL, clusterName, f.logger, f.dc)
case defaultCfg.SQL != nil:
if defaultCfg.SQL.EncodingType == "" {
defaultCfg.SQL.EncodingType = string(common.EncodingTypeThriftRW)
Expand Down Expand Up @@ -454,7 +454,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]}
switch {
case visibilityCfg.NoSQL != nil:
visibilityDataStore.factory = nosql.NewFactory(*visibilityCfg.NoSQL, clusterName, f.logger)
visibilityDataStore.factory = nosql.NewFactory(*visibilityCfg.NoSQL, clusterName, f.logger, f.dc)
case visibilityCfg.SQL != nil:
var decodingTypes []common.EncodingType
for _, dt := range visibilityCfg.SQL.DecodingTypes {
Expand Down
6 changes: 4 additions & 2 deletions common/persistence/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
type (
// DynamicConfiguration represents dynamic configuration for persistence layer
DynamicConfiguration struct {
EnableSQLAsyncTransaction dynamicconfig.BoolPropertyFn
EnableSQLAsyncTransaction dynamicconfig.BoolPropertyFn
EnableCassandraAllConsistencyLevelDelete dynamicconfig.BoolPropertyFn
}
)

// NewDynamicConfiguration returns new config with default values
func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration {
return &DynamicConfiguration{
EnableSQLAsyncTransaction: dc.GetBoolProperty(dynamicconfig.EnableSQLAsyncTransaction),
EnableSQLAsyncTransaction: dc.GetBoolProperty(dynamicconfig.EnableSQLAsyncTransaction),
EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicconfig.EnableCassandraAllConsistencyLevelDelete),
}
}
23 changes: 13 additions & 10 deletions common/persistence/nosql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type (
clusterName string
logger log.Logger
execStoreFactory *executionStoreFactory
dc *p.DynamicConfiguration
}

executionStoreFactory struct {
Expand All @@ -47,32 +48,33 @@ type (

// NewFactory returns an instance of a factory object which can be used to create
// datastores that are backed by cassandra
func NewFactory(cfg config.Cassandra, clusterName string, logger log.Logger) *Factory {
func NewFactory(cfg config.Cassandra, clusterName string, logger log.Logger, dc *p.DynamicConfiguration) *Factory {
return &Factory{
cfg: cfg,
clusterName: clusterName,
logger: logger,
dc: dc,
}
}

// NewTaskStore returns a new task store
func (f *Factory) NewTaskStore() (p.TaskStore, error) {
return newNoSQLTaskStore(f.cfg, f.logger)
return newNoSQLTaskStore(f.cfg, f.logger, f.dc)
}

// NewShardStore returns a new shard store
func (f *Factory) NewShardStore() (p.ShardStore, error) {
return newNoSQLShardStore(f.cfg, f.clusterName, f.logger)
return newNoSQLShardStore(f.cfg, f.clusterName, f.logger, f.dc)
}

// NewHistoryStore returns a new history store
func (f *Factory) NewHistoryStore() (p.HistoryStore, error) {
return newNoSQLHistoryStore(f.cfg, f.logger)
return newNoSQLHistoryStore(f.cfg, f.logger, f.dc)
}

// NewDomainStore returns a metadata store that understands only v2
func (f *Factory) NewDomainStore() (p.DomainStore, error) {
return newNoSQLDomainStore(f.cfg, f.clusterName, f.logger)
return newNoSQLDomainStore(f.cfg, f.clusterName, f.logger, f.dc)
}

// NewExecutionStore returns an ExecutionStore for a given shardID
Expand All @@ -86,17 +88,17 @@ func (f *Factory) NewExecutionStore(shardID int) (p.ExecutionStore, error) {

// NewVisibilityStore returns a visibility store
func (f *Factory) NewVisibilityStore(sortByCloseTime bool) (p.VisibilityStore, error) {
return newNoSQLVisibilityStore(sortByCloseTime, f.cfg, f.logger)
return newNoSQLVisibilityStore(sortByCloseTime, f.cfg, f.logger, f.dc)
}

// NewQueue returns a new queue backed by cassandra
func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error) {
return newNoSQLQueueStore(f.cfg, f.logger, queueType)
return newNoSQLQueueStore(f.cfg, f.logger, queueType, f.dc)
}

// NewConfigStore returns a new config store
func (f *Factory) NewConfigStore() (p.ConfigStore, error) {
return NewNoSQLConfigStore(f.cfg, f.logger)
return NewNoSQLConfigStore(f.cfg, f.logger, f.dc)
}

// Close closes the factory
Expand All @@ -121,7 +123,7 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
return f.execStoreFactory, nil
}

factory, err := newExecutionStoreFactory(f.cfg, f.logger)
factory, err := newExecutionStoreFactory(f.cfg, f.logger, f.dc)
if err != nil {
return nil, err
}
Expand All @@ -133,9 +135,10 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
func newExecutionStoreFactory(
cfg config.Cassandra,
logger log.Logger,
dc *p.DynamicConfiguration,
) (*executionStoreFactory, error) {

db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlConfigStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type (
func NewNoSQLConfigStore(
cfg config.NoSQL,
logger log.Logger,
dc *persistence.DynamicConfiguration,
) (persistence.ConfigStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlDomainStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func newNoSQLDomainStore(
cfg config.NoSQL,
currentClusterName string,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.DomainStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlHistoryStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func NewNoSQLHistoryStoreFromSession(
func newNoSQLHistoryStore(
cfg config.NoSQL,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.HistoryStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/nosql/nosqlPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/persistence-tests/testcluster"
)

Expand Down Expand Up @@ -78,7 +79,7 @@ func (s *testCluster) Config() config.Persistence {

// SetupTestDatabase from PersistenceTestCluster interface
func (s *testCluster) SetupTestDatabase() {
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger())
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger(), &persistence.DynamicConfiguration{})

if err != nil {
log.Fatal(err)
Expand All @@ -91,7 +92,7 @@ func (s *testCluster) SetupTestDatabase() {

// TearDownTestDatabase from PersistenceTestCluster interface
func (s *testCluster) TearDownTestDatabase() {
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger())
adminDB, err := NewNoSQLAdminDB(&s.cfg, loggerimpl.NewNopLogger(), &persistence.DynamicConfiguration{})
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/nosql/nosqlQueueStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/types"
)
Expand All @@ -46,8 +47,9 @@ func newNoSQLQueueStore(
cfg config.NoSQL,
logger log.Logger,
queueType persistence.QueueType,
dc *p.DynamicConfiguration,
) (persistence.Queue, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlShardStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func newNoSQLShardStore(
cfg config.NoSQL,
clusterName string,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.ShardStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlTaskStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ var _ p.TaskStore = (*nosqlTaskStore)(nil)
func newNoSQLTaskStore(
cfg config.NoSQL,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.TaskStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func newNoSQLVisibilityStore(
listClosedOrderingByCloseTime bool,
cfg config.NoSQL,
logger log.Logger,
dc *p.DynamicConfiguration,
) (p.VisibilityStore, error) {
db, err := NewNoSQLDB(&cfg, logger)
db, err := NewNoSQLDB(&cfg, logger, dc)
if err != nil {
return nil, err
}
Expand Down
35 changes: 22 additions & 13 deletions common/persistence/nosql/nosqlplugin/cassandra/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
)
Expand All @@ -34,17 +35,19 @@ type cdb struct {
client gocql.Client
session gocql.Session
cfg *config.NoSQL
dc *persistence.DynamicConfiguration
}

var _ nosqlplugin.DB = (*cdb)(nil)

// newCassandraDBFromSession returns a DB from a session
func newCassandraDBFromSession(cfg *config.NoSQL, session gocql.Session, logger log.Logger) *cdb {
func newCassandraDBFromSession(cfg *config.NoSQL, session gocql.Session, logger log.Logger, dc *persistence.DynamicConfiguration) *cdb {
return &cdb{
client: gocql.GetRegisteredClient(),
session: session,
logger: logger,
cfg: cfg,
dc: dc,
}
}

Expand Down Expand Up @@ -79,23 +82,29 @@ func (db *cdb) isCassandraConsistencyError(err error) bool {
}

func (db *cdb) executeWithConsistencyAll(q gocql.Query) error {
if err := q.Consistency(cassandraAllConslevel).Exec(); err != nil {
if db.isCassandraConsistencyError(err) {
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
return q.Consistency(cassandraDefaultConsLevel).Exec()
if db.dc != nil && db.dc.EnableCassandraAllConsistencyLevelDelete() {
if err := q.Consistency(cassandraAllConslevel).Exec(); err != nil {
if db.isCassandraConsistencyError(err) {
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
return q.Consistency(cassandraDefaultConsLevel).Exec()
}
return err
}
return err
return nil
}
return nil
return q.Exec()
}

func (db *cdb) executeBatchWithConsistencyAll(b gocql.Batch) error {
if err := db.session.ExecuteBatch(b.Consistency(cassandraAllConslevel)); err != nil {
if db.isCassandraConsistencyError(err) {
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
return db.session.ExecuteBatch(b.Consistency(cassandraDefaultConsLevel))
if db.dc != nil && db.dc.EnableCassandraAllConsistencyLevelDelete() {
if err := db.session.ExecuteBatch(b.Consistency(cassandraAllConslevel)); err != nil {
if db.isCassandraConsistencyError(err) {
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
return db.session.ExecuteBatch(b.Consistency(cassandraDefaultConsLevel))
}
return err
}
return err
return nil
}
return nil
return db.session.ExecuteBatch(b)
}
13 changes: 7 additions & 6 deletions common/persistence/nosql/nosqlplugin/cassandra/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
Expand All @@ -47,12 +48,12 @@ func init() {
}

// CreateDB initialize the db object
func (p *plugin) CreateDB(cfg *config.NoSQL, logger log.Logger) (nosqlplugin.DB, error) {
return p.doCreateDB(cfg, logger)
func (p *plugin) CreateDB(cfg *config.NoSQL, logger log.Logger, dc *persistence.DynamicConfiguration) (nosqlplugin.DB, error) {
return p.doCreateDB(cfg, logger, dc)
}

// CreateAdminDB initialize the AdminDB object
func (p *plugin) CreateAdminDB(cfg *config.NoSQL, logger log.Logger) (nosqlplugin.AdminDB, error) {
func (p *plugin) CreateAdminDB(cfg *config.NoSQL, logger log.Logger, dc *persistence.DynamicConfiguration) (nosqlplugin.AdminDB, error) {
// the keyspace is not created yet, so use empty and let the Cassandra connect
keyspace := cfg.Keyspace
cfg.Keyspace = ""
Expand All @@ -61,15 +62,15 @@ func (p *plugin) CreateAdminDB(cfg *config.NoSQL, logger log.Logger) (nosqlplugi
cfg.Keyspace = keyspace
}()

return p.doCreateDB(cfg, logger)
return p.doCreateDB(cfg, logger, dc)
}

func (p *plugin) doCreateDB(cfg *config.NoSQL, logger log.Logger) (*cdb, error) {
func (p *plugin) doCreateDB(cfg *config.NoSQL, logger log.Logger, dc *persistence.DynamicConfiguration) (*cdb, error) {
session, err := gocql.GetRegisteredClient().CreateSession(toGoCqlConfig(cfg))
if err != nil {
return nil, err
}
db := newCassandraDBFromSession(cfg, session, logger)
db := newCassandraDBFromSession(cfg, session, logger, dc)
return db, nil
}

Expand Down
Loading