Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#47589
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hawkingrei authored and ti-chi-bot committed Feb 19, 2024
1 parent 6d13674 commit 4940877
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 2 deletions.
4 changes: 2 additions & 2 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec(fmt.Sprintf("set @@global.tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@global.tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec(fmt.Sprintf("set @@tidb_analyze_partition_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_analyze_partition_concurrency").Check(testkit.Rows(concurrency))

Expand Down
203 changes: 203 additions & 0 deletions statistics/handle/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"strconv"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
)

// SessionPool is used to recycle sessionctx.
type SessionPool interface {
Get() (pools.Resource, error)
Put(pools.Resource)
}

// StatsCtx is used to mark the request is from stats module.
func StatsCtx(ctx context.Context) context.Context {
return kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
}

// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func FinishTransaction(sctx sessionctx.Context, err error) error {
if err == nil {
_, err = Exec(sctx, "commit")
} else {
_, err1 := Exec(sctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}

var (
// FlagWrapTxn indicates whether to wrap a transaction.
FlagWrapTxn = 0
)

// CallWithSCtx allocates a sctx from the pool and call the f().
func CallWithSCtx(pool SessionPool, f func(sctx sessionctx.Context) error, flags ...int) (err error) {
se, err := pool.Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
pool.Put(se)
}
}()
sctx := se.(sessionctx.Context)
if err := UpdateSCtxVarsForStats(sctx); err != nil { // update stats variables automatically
return err
}

wrapTxn := false
for _, flag := range flags {
if flag == FlagWrapTxn {
wrapTxn = true
}
}
if wrapTxn {
err = WrapTxn(sctx, f)
} else {
err = f(sctx)
}
return err
}

// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics.
func UpdateSCtxVarsForStats(sctx sessionctx.Context) error {
// analyzer version
verInString, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion)
if err != nil {
return err
}
ver, err := strconv.ParseInt(verInString, 10, 64)
if err != nil {
return err
}
sctx.GetSessionVars().AnalyzeVersion = int(ver)

// enable historical stats
val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats)
if err != nil {
return err
}
sctx.GetSessionVars().EnableHistoricalStats = variable.TiDBOptOn(val)

// partition mode
pruneMode, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBPartitionPruneMode)
if err != nil {
return err
}
sctx.GetSessionVars().PartitionPruneMode.Store(pruneMode)

// enable analyze snapshot
analyzeSnapshot, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAnalyzeSnapshot)
if err != nil {
return err
}
sctx.GetSessionVars().EnableAnalyzeSnapshot = variable.TiDBOptOn(analyzeSnapshot)

// enable skip column types
val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes)
if err != nil {
return err
}
sctx.GetSessionVars().AnalyzeSkipColumnTypes = variable.ParseAnalyzeSkipColumnTypes(val)

// skip missing partition stats
val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSkipMissingPartitionStats)
if err != nil {
return err
}
sctx.GetSessionVars().SkipMissingPartitionStats = variable.TiDBOptOn(val)
verInString, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency)
if err != nil {
return err
}
ver, err = strconv.ParseInt(verInString, 10, 64)
if err != nil {
return err
}
sctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver)
return nil
}

// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility.
func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) {
// TODO: check whether this sctx is already in a txn
if _, err := Exec(sctx, "begin"); err != nil {
return err
}
defer func() {
err = FinishTransaction(sctx, err)
}()
err = f(sctx)
return
}

// GetStartTS gets the start ts from current transaction.
func GetStartTS(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
return 0, err
}
return txn.StartTS(), nil
}

// Exec is a helper function to execute sql and return RecordSet.
func Exec(sctx sessionctx.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) {
sqlExec, ok := sctx.(sqlexec.SQLExecutor)
if !ok {
return nil, errors.Errorf("invalid sql executor")
}
// TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor
return sqlExec.ExecuteInternal(StatsCtx(context.Background()), sql, args...)
}

// ExecRows is a helper function to execute sql and return rows and fields.
func ExecRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) {
sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor)
if !ok {
return nil, nil, errors.Errorf("invalid sql executor")
}
return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...)
}

// ExecWithOpts is a helper function to execute sql and return rows and fields.
func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) {
sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor)
if !ok {
return nil, nil, errors.Errorf("invalid sql executor")
}
return sqlExec.ExecRestrictedSQL(StatsCtx(context.Background()), opts, sql, args...)
}

// DurationToTS converts duration to timestamp.
func DurationToTS(d time.Duration) uint64 {
return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0)
}

0 comments on commit 4940877

Please sign in to comment.