From c185086080c0c3a7e828c8880e21f5aac684d58e Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 12 Oct 2023 22:54:27 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #47589 Signed-off-by: ti-chi-bot --- executor/analyze_test.go | 4 +- statistics/handle/util/util.go | 203 +++++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 2 deletions(-) create mode 100644 statistics/handle/util/util.go diff --git a/executor/analyze_test.go b/executor/analyze_test.go index a6cdea833df50..e6dfa2df4db5b 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -369,8 +369,8 @@ func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) { for _, tc := range testcases { concurrency := tc.concurrency fmt.Println("testcase ", concurrency) - tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency)) - tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec(fmt.Sprintf("set @@global.tidb_merge_partition_stats_concurrency=%v", concurrency)) + tk.MustQuery("select @@global.tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency)) tk.MustExec(fmt.Sprintf("set @@tidb_analyze_partition_concurrency=%v", concurrency)) tk.MustQuery("select @@tidb_analyze_partition_concurrency").Check(testkit.Rows(concurrency)) diff --git a/statistics/handle/util/util.go b/statistics/handle/util/util.go new file mode 100644 index 0000000000000..dd581446880ea --- /dev/null +++ b/statistics/handle/util/util.go @@ -0,0 +1,203 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "strconv" + "time" + + "github.com/ngaut/pools" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/tikv/client-go/v2/oracle" +) + +// SessionPool is used to recycle sessionctx. +type SessionPool interface { + Get() (pools.Resource, error) + Put(pools.Resource) +} + +// StatsCtx is used to mark the request is from stats module. +func StatsCtx(ctx context.Context) context.Context { + return kv.WithInternalSourceType(ctx, kv.InternalTxnStats) +} + +// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`. +func FinishTransaction(sctx sessionctx.Context, err error) error { + if err == nil { + _, err = Exec(sctx, "commit") + } else { + _, err1 := Exec(sctx, "rollback") + terror.Log(errors.Trace(err1)) + } + return errors.Trace(err) +} + +var ( + // FlagWrapTxn indicates whether to wrap a transaction. + FlagWrapTxn = 0 +) + +// CallWithSCtx allocates a sctx from the pool and call the f(). +func CallWithSCtx(pool SessionPool, f func(sctx sessionctx.Context) error, flags ...int) (err error) { + se, err := pool.Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + pool.Put(se) + } + }() + sctx := se.(sessionctx.Context) + if err := UpdateSCtxVarsForStats(sctx); err != nil { // update stats variables automatically + return err + } + + wrapTxn := false + for _, flag := range flags { + if flag == FlagWrapTxn { + wrapTxn = true + } + } + if wrapTxn { + err = WrapTxn(sctx, f) + } else { + err = f(sctx) + } + return err +} + +// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics. +func UpdateSCtxVarsForStats(sctx sessionctx.Context) error { + // analyzer version + verInString, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) + if err != nil { + return err + } + ver, err := strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeVersion = int(ver) + + // enable historical stats + val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) + if err != nil { + return err + } + sctx.GetSessionVars().EnableHistoricalStats = variable.TiDBOptOn(val) + + // partition mode + pruneMode, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBPartitionPruneMode) + if err != nil { + return err + } + sctx.GetSessionVars().PartitionPruneMode.Store(pruneMode) + + // enable analyze snapshot + analyzeSnapshot, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAnalyzeSnapshot) + if err != nil { + return err + } + sctx.GetSessionVars().EnableAnalyzeSnapshot = variable.TiDBOptOn(analyzeSnapshot) + + // enable skip column types + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeSkipColumnTypes = variable.ParseAnalyzeSkipColumnTypes(val) + + // skip missing partition stats + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSkipMissingPartitionStats) + if err != nil { + return err + } + sctx.GetSessionVars().SkipMissingPartitionStats = variable.TiDBOptOn(val) + verInString, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency) + if err != nil { + return err + } + ver, err = strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver) + return nil +} + +// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility. +func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) { + // TODO: check whether this sctx is already in a txn + if _, err := Exec(sctx, "begin"); err != nil { + return err + } + defer func() { + err = FinishTransaction(sctx, err) + }() + err = f(sctx) + return +} + +// GetStartTS gets the start ts from current transaction. +func GetStartTS(sctx sessionctx.Context) (uint64, error) { + txn, err := sctx.Txn(true) + if err != nil { + return 0, err + } + return txn.StartTS(), nil +} + +// Exec is a helper function to execute sql and return RecordSet. +func Exec(sctx sessionctx.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) { + sqlExec, ok := sctx.(sqlexec.SQLExecutor) + if !ok { + return nil, errors.Errorf("invalid sql executor") + } + // TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor + return sqlExec.ExecuteInternal(StatsCtx(context.Background()), sql, args...) +} + +// ExecRows is a helper function to execute sql and return rows and fields. +func ExecRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + return nil, nil, errors.Errorf("invalid sql executor") + } + return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...) +} + +// ExecWithOpts is a helper function to execute sql and return rows and fields. +func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + return nil, nil, errors.Errorf("invalid sql executor") + } + return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), opts, sql, args...) +} + +// DurationToTS converts duration to timestamp. +func DurationToTS(d time.Duration) uint64 { + return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0) +} From a78579201ccd9b7a5c9a092a6a269a0e0469d5ff Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 19 Feb 2024 15:14:34 +0800 Subject: [PATCH 2/2] test Signed-off-by: Weizhen Wang --- statistics/handle/handle.go | 11 +- statistics/handle/util/util.go | 203 --------------------------------- 2 files changed, 10 insertions(+), 204 deletions(-) delete mode 100644 statistics/handle/util/util.go diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 04779340d36ee..e0ee5f0e991d9 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -641,7 +641,16 @@ func (h *Handle) UpdateSessionVar() error { return err } h.mu.ctx.GetSessionVars().AnalyzeVersion = int(ver) - return err + verInString, err = h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency) + if err != nil { + return err + } + ver, err = strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + h.mu.ctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver) + return nil } // GlobalStats is used to store the statistics contained in the global-level stats diff --git a/statistics/handle/util/util.go b/statistics/handle/util/util.go deleted file mode 100644 index dd581446880ea..0000000000000 --- a/statistics/handle/util/util.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "context" - "strconv" - "time" - - "github.com/ngaut/pools" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/sqlexec" - "github.com/tikv/client-go/v2/oracle" -) - -// SessionPool is used to recycle sessionctx. -type SessionPool interface { - Get() (pools.Resource, error) - Put(pools.Resource) -} - -// StatsCtx is used to mark the request is from stats module. -func StatsCtx(ctx context.Context) context.Context { - return kv.WithInternalSourceType(ctx, kv.InternalTxnStats) -} - -// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`. -func FinishTransaction(sctx sessionctx.Context, err error) error { - if err == nil { - _, err = Exec(sctx, "commit") - } else { - _, err1 := Exec(sctx, "rollback") - terror.Log(errors.Trace(err1)) - } - return errors.Trace(err) -} - -var ( - // FlagWrapTxn indicates whether to wrap a transaction. - FlagWrapTxn = 0 -) - -// CallWithSCtx allocates a sctx from the pool and call the f(). -func CallWithSCtx(pool SessionPool, f func(sctx sessionctx.Context) error, flags ...int) (err error) { - se, err := pool.Get() - if err != nil { - return err - } - defer func() { - if err == nil { // only recycle when no error - pool.Put(se) - } - }() - sctx := se.(sessionctx.Context) - if err := UpdateSCtxVarsForStats(sctx); err != nil { // update stats variables automatically - return err - } - - wrapTxn := false - for _, flag := range flags { - if flag == FlagWrapTxn { - wrapTxn = true - } - } - if wrapTxn { - err = WrapTxn(sctx, f) - } else { - err = f(sctx) - } - return err -} - -// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics. -func UpdateSCtxVarsForStats(sctx sessionctx.Context) error { - // analyzer version - verInString, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) - if err != nil { - return err - } - ver, err := strconv.ParseInt(verInString, 10, 64) - if err != nil { - return err - } - sctx.GetSessionVars().AnalyzeVersion = int(ver) - - // enable historical stats - val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) - if err != nil { - return err - } - sctx.GetSessionVars().EnableHistoricalStats = variable.TiDBOptOn(val) - - // partition mode - pruneMode, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBPartitionPruneMode) - if err != nil { - return err - } - sctx.GetSessionVars().PartitionPruneMode.Store(pruneMode) - - // enable analyze snapshot - analyzeSnapshot, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAnalyzeSnapshot) - if err != nil { - return err - } - sctx.GetSessionVars().EnableAnalyzeSnapshot = variable.TiDBOptOn(analyzeSnapshot) - - // enable skip column types - val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) - if err != nil { - return err - } - sctx.GetSessionVars().AnalyzeSkipColumnTypes = variable.ParseAnalyzeSkipColumnTypes(val) - - // skip missing partition stats - val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSkipMissingPartitionStats) - if err != nil { - return err - } - sctx.GetSessionVars().SkipMissingPartitionStats = variable.TiDBOptOn(val) - verInString, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency) - if err != nil { - return err - } - ver, err = strconv.ParseInt(verInString, 10, 64) - if err != nil { - return err - } - sctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver) - return nil -} - -// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility. -func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) { - // TODO: check whether this sctx is already in a txn - if _, err := Exec(sctx, "begin"); err != nil { - return err - } - defer func() { - err = FinishTransaction(sctx, err) - }() - err = f(sctx) - return -} - -// GetStartTS gets the start ts from current transaction. -func GetStartTS(sctx sessionctx.Context) (uint64, error) { - txn, err := sctx.Txn(true) - if err != nil { - return 0, err - } - return txn.StartTS(), nil -} - -// Exec is a helper function to execute sql and return RecordSet. -func Exec(sctx sessionctx.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) { - sqlExec, ok := sctx.(sqlexec.SQLExecutor) - if !ok { - return nil, errors.Errorf("invalid sql executor") - } - // TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor - return sqlExec.ExecuteInternal(StatsCtx(context.Background()), sql, args...) -} - -// ExecRows is a helper function to execute sql and return rows and fields. -func ExecRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { - sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) - if !ok { - return nil, nil, errors.Errorf("invalid sql executor") - } - return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...) -} - -// ExecWithOpts is a helper function to execute sql and return rows and fields. -func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { - sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) - if !ok { - return nil, nil, errors.Errorf("invalid sql executor") - } - return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), opts, sql, args...) -} - -// DurationToTS converts duration to timestamp. -func DurationToTS(d time.Duration) uint64 { - return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0) -}